大模型开发实战

本文将以百度的文心一言,阿里巴巴的通义千问,科大讯飞的讯飞星火认知大模型,智谱清言的ChartGLM,腾讯的混元大模型,OpenAI的ChatGPT,这六个模型的api调用进行实战代码讲解。

大模型向量数据库创建

对上传的不同格式文件进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from embedding.call_embedding import get_embedding
from langchain.document_loaders import UnstructuredFileLoader, UnstructuredCSVLoader, CSVLoader
from langchain.document_loaders import UnstructuredMarkdownLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyMuPDFLoader
from langchain.vectorstores import Chroma

def file_loader(file, loaders):
if not os.path.isfile(file):
[file_loader(os.path.join(file, f), loaders) for f in os.listdir(file)]
return
file_type = file.split('.')[-1]
if file_type == 'pdf':
loaders.append(PyMuPDFLoader(file))
elif file_type == 'md':
loaders.append(UnstructuredMarkdownLoader(file))
elif file_type == 'txt':
loaders.append(UnstructuredFileLoader(file))
elif file_type == 'csv':
loaders.append(CSVLoader(file))
return

切分文档

:warning:小技巧

在每段分割的文档前后加上文档名,这样可以保证查询的信息是指定文档内的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def create_db(files=DEFAULT_DB_PATH, persist_directory=DEFAULT_PERSIST_PATH, embeddings="openai"):
"""
该函数用于加载 PDF 文件,切分文档,生成文档的嵌入向量,创建向量数据库。

参数:
file: 存放文件的路径。
embeddings: 用于生产 Embedding 的模型

返回:
vectordb: 创建的数据库。
"""
if files == None:
return "can't load empty file"
if type(files) != list:
files = [files]
loaders = []
[file_loader(file, loaders) for file in files]
docs = []
for loader in loaders:
if loader is not None:
docs.extend(loader.load())
# 切分文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500, chunk_overlap=150)
split_docs = text_splitter.split_documents(docs[:1000])
"""小技巧"""
# 在每段分割的文档前后加上文档名,这样可以保证查询的信息是指定文档内的
for one_chunk in split_docs:
one_chunk.page_content = one_chunk.metadata["source"].split("/")[-1] + one_chunk.page_content + \
one_chunk.metadata["source"].split("/")[-1]
if type(embeddings) == str:
embeddings = get_embedding(embedding=embeddings)

# 加载数据库
vectordb = Chroma.from_documents(
documents=split_docs,
embedding=embeddings,
persist_directory=persist_directory # 允许我们将persist_directory目录保存到磁盘上
)
vectordb.persist()
return vectordb

大模型API调用

抖音豆包大模型(新增)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def doubao_llm(prompt: str, model: str, api_key: str, temperature: float):
client = Ark(base_url="https://ark.cn-beijing.volces.com/api/v3",
api_key=api_key,
max_retries=5,
timeout=600)
# Non-streaming:
print("----- standard request -----")
completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "系统提示词"},
{"role": "user", "content": prompt},
],
temperature=temperature
)
return completion.usage.total_tokens, completion.choices[0].message.content

腾讯的混元大模型

:warning:混元大模型需要企业认证才可以使用。官方API文档腾讯混元大模型 腾讯混元大模型标准版-腾讯混元大模型相关接口-API 中心-腾讯云 (tencent.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 定义一个函数get_completion_hunyuan,用于调用hunyuan原生接口
def get_completion_hunyuan(prompt: str, model: str, temperature: float, secret_id: str, secret_key: str):
# 判断是否传入secret_id和secret_key,如果没有传入,则从配置文件中解析
if secret_id == None or secret_key == None:
secret_id, secret_key = parse_llm_api_key("hunyuan")
# 使用secret_id和secret_key创建一个认证对象
cred = credential.Credential(secret_id, secret_key)

# 创建一个客户端配置对象,可以设置连接池大小等参数
cpf = ClientProfile()
# 设置预先建立连接的数量,可以降低访问延迟
cpf.httpProfile.pre_conn_pool_size = 3

# 使用认证对象和客户端配置对象创建一个hunyuan客户端
client = hunyuan_client.HunyuanClient(cred, "ap-guangzhou", cpf)
# 创建一个请求对象
req = models.ChatStdRequest()
# 创建一个消息对象,设置角色为用户,内容为传入的prompt
msg = models.Message()
msg.Role = "user"
msg.Content = prompt
# 将消息对象添加到请求对象的Messages列表中
req.Messages = [msg]

# 调用hunyuan客户端的ChatStd方法,传入请求对象,获取响应
resp = client.ChatStd(req)
# 打印响应
print("resp:", resp)
# 初始化一个字符串,用于存储完整的响应内容
full_content = ""
# 遍历响应中的事件
for event in resp:
# 打印事件
print("event", event)
# 将事件中的数据解析为json对象
data = json.loads(event['data'])
# 遍历json对象中的Choices列表
for choice in data['Choices']:
# 将每个Choice中的Content添加到full_content中
full_content += choice['Delta']['Content']
# 打印完整的响应内容
print("full_content:", full_content)

# 返回完整的响应内容
return full_content

阿里巴巴的通义千问

官方API文档快速入门_模型服务灵积(DashScope)-阿里云帮助中心 (aliyun.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 定义一个函数get_completion_qwen,用于调用qwen原生接口
def get_completion_qwen(prompt: str, model: str, temperature: float, api_key: str, max_tokens: int):
# 判断是否传入api_key,如果没有传入,则从配置文件中解析
if api_key == None:
api_key = parse_llm_api_key("qwen")
# 设置dashscope的api_key,dashscope是一个第三方库,用于简化API调用
dashscope.api_key = api_key

# 创建一个消息列表,包含用户的消息
messages = [{"role": "user", "content": prompt}]
# 调用dashscope.Generation.call方法,传入模型名称、消息列表、温度系数、最大回复长度和结果格式
response = dashscope.Generation.call(
model=model, # 模型名称
messages=messages, # 消息列表
temperature=temperature, # 模型输出的温度系数,控制输出的随机程度
max_tokens=max_tokens, # 回复最大长度
result_format='message', # 设置结果格式为"message"
)

# 打印响应
print(response)

# 从响应中提取回复内容并返回
return response.output.choices[0].message["content"]

OpenAI的ChatGPT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 定义一个函数get_completion_gpt,用于调用OpenAI原生接口
def get_completion_gpt(prompt: str, model: str, temperature: float, api_key: str, max_tokens: int):
# 判断是否传入api_key,如果没有传入,则从配置文件中解析
if api_key == None:
api_key = parse_llm_api_key("openai")
# 设置OpenAI的api_key,openai是一个第三方库,用于简化API调用
openai.api_key = api_key

# 创建一个消息列表,包含用户的消息
messages = [{"role": "user", "content": prompt}]

# 调用openai.ChatCompletion.create方法,传入模型名称、消息列表、温度系数和最大回复长度
response = openai.ChatCompletion.create(
model=model, # 模型名称
messages=messages, # 消息列表
temperature=temperature, # 模型输出的温度系数,控制输出的随机程度
max_tokens=max_tokens, # 回复最大长度
)

# 从响应中提取回复内容并返回
return response.choices[0].message["content"]

百度的文心一言

官方API文档API介绍 - 千帆大模型平台 | 百度智能云文档 (baidu.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def get_access_token(api_key, secret_key):
"""
使用 API Key,Secret Key 获取access_token,替换下列示例中的应用API Key、应用Secret Key
"""
# 指定网址
url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={api_key}&client_secret={secret_key}"
# 设置 POST 访问
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
# 通过 POST 访问获取账户对应的 access_token
response = requests.request("POST", url, headers=headers, data=payload)
return response.json().get("access_token")


def get_completion_wenxin(prompt: str, model: str, temperature: float, api_key: str, secret_key: str):
# 封装百度文心原生接口
if api_key == None or secret_key == None:
api_key, secret_key = parse_llm_api_key("wenxin")
# 获取access_token
access_token = get_access_token(api_key, secret_key)
# 调用接口
url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token={access_token}"
# 配置 POST 参数
payload = json.dumps({
"messages": [
{
"role": "user", # user prompt
"content": "{}".format(prompt) # 输入的 prompt
}
]
})
headers = {
'Content-Type': 'application/json'
}
# 发起请求
response = requests.request("POST", url, headers=headers, data=payload)
# 返回的是一个 Json 字符串
js = json.loads(response.text)
return js["result"]

科大讯飞的讯飞星火认知大模型

官方API文档星火认知大模型Web API文档 | 讯飞开放平台文档中心 (xfyun.cn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def get_completion_spark(prompt: str, model: str, temperature: float, api_key: str, appid: str, api_secret: str,
max_tokens: int):
if api_key == None or appid == None and api_secret == None:
api_key, appid, api_secret = parse_llm_api_key("spark")

# 配置 1.5、2、3、3.5 的不同环境
if model == "Spark-1.5":
domain = "general"
Spark_url = "wss://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址
if model == "Spark-2.0":
domain = "generalv2" # v2.0版本
Spark_url = "wss://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址
if model == "Spark-3.0":
domain = "generalv3" # v3.0版本
Spark_url = "wss://spark-api.xf-yun.com/v3.1/chat" # v3.0环境的地址
if model == "Spark-3.5":
domain = "generalv3.5" # v3.5版本
Spark_url = "wss://spark-api.xf-yun.com/v3.5/chat" # v3.5环境的地址

question = [{"role": "user", "content": prompt}]
response = spark_main(appid, api_key, api_secret, Spark_url, domain, question, temperature, max_tokens)
return response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class Ws_Param(object):
# 初始化
def __init__(self, APPID, APIKey, APISecret, Spark_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(Spark_url).netloc
self.path = urlparse(Spark_url).path
self.Spark_url = Spark_url
# 自定义
self.temperature = 0
self.max_tokens = 2048

# 生成url
def create_url(self):
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))

# 拼接字符串
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"

# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()

signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": self.host
}
# 拼接鉴权参数,生成url
url = self.Spark_url + '?' + urlencode(v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
return url


# 收到websocket错误的处理
def on_error(ws, error):
print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws, one, two):
print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
thread.start_new_thread(run, (ws,))


def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, domain=ws.domain, question=ws.question, temperature=ws.temperature,
max_tokens=ws.max_tokens))
ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
# print(message)
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
print(content, end="")
global answer
answer += content
# print(1)
if status == 2:
ws.close()


def gen_params(appid, domain, question, temperature, max_tokens):
"""
通过appid和用户的提问来生成请参数
"""
data = {
"header": {
"app_id": appid,
"uid": "1234"
},
"parameter": {
"chat": {
"domain": domain,
"random_threshold": 0.5,
"max_tokens": max_tokens,
"temperature": temperature,
"auditing": "default"
}
},
"payload": {
"message": {
"text": question
}
}
}
return data


def spark_main(appid, api_key, api_secret, Spark_url, domain, question, temperature, max_tokens):
# print("星火:")
output_queue = queue.Queue()

def on_message(ws, message):
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
# print(content, end='')
# 将输出值放入队列
output_queue.put(content)
if status == 2:
ws.close()

wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.question = question
ws.domain = domain
ws.temperature = temperature
ws.max_tokens = max_tokens
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
return ''.join([output_queue.get() for _ in range(output_queue.qsize())])

智谱清言的ChartGLM(2024.08最新调用)

官方API文档智谱AI开放平台 (bigmodel.cn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from zhipuai import ZhipuAI

def get_completion_glm(prompt: str, model: str, temperature: float, api_key: str, max_tokens: int):
# 判断是否传入api_key,如果没有传入,则从配置文件中解析
if api_key is None:
api_key = parse_llm_api_key("zhipuai")
# 设置zhipuai的api_key,zhipuai是一个第三方库,用于简化API调用
client = ZhipuAI(api_key=api_key) # 填写您自己的APIKey

# 调用 client.chat.completions.create方法,传入模型名称、提示、温度系数和最大回复长度
response = client.chat.completions.create(
model=model, # 模型名称
messages=[
{"role": "user", "content": prompt},
{"role": "system", "content": "系统提示词"},
], # 提示
temperature=temperature, # 温度系数
max_tokens=max_tokens # 最大回复长度
)

# 从响应中提取回复内容,并去除首尾的引号和空格
return response.choices[0].message.content

获取本地.env配置文件的api_key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import os
from dotenv import load_dotenv, find_dotenv
from langchain.utils import get_from_dict_or_env


def parse_llm_api_key(model: str, env_file: dict() = None):
"""
通过 model 和 env_file 的来解析平台参数
"""
if env_file is None:
_ = load_dotenv(find_dotenv())
env_file = os.environ
if model == "openai":
return env_file["OPENAI_API_KEY"]
elif model == "wenxin":
return env_file["wenxin_api_key"], env_file["wenxin_secret_key"]
elif model == "spark":
return env_file["spark_api_key"], env_file["spark_appid"], env_file["spark_api_secret"]
elif model == "zhipuai":
return get_from_dict_or_env(env_file, "zhipuai_api_key", "ZHIPUAI_API_KEY")
# return env_file["ZHIPUAI_API_KEY"]
elif model == "qwen":
return env_file["QWEN_API_KEY"]
elif model == "hunyuan":
return env_file["hunyuan_secret_id"], env_file["hunyuan_secret_key"]
else:
raise ValueError(f"model{model} not support!!!")