演示代码为调用tools的功能
这边只做演示使用
Fastapi接口构造
# fastapi接口
@app.post("/v1/chat/completions", status_code=status.HTTP_200_OK)
async def create_chat_completion(request: ChatCompletionRequest):
"""
生成聊天
:param request:
:return:
"""
query = request.messages[-1].content
print(query)
prev_messages = request.messages[:-1]
if len(prev_messages) and prev_messages[0].role == Role.SYSTEM:
system = prev_messages.pop(0).content
else:
system = None
system = system.replace('\n', '')
messages = []
messages.append(
{
"role": "system",
"content": system
},
)
messages.append(
{
"role": "user",
"content": query
}
)
return EventSourceResponse(tools_select(messages))
功能实现
调用的tools_select
该函数的功能主要是开一个新的线程来fetch_data(),用于获取返回的信息,因为zhipu的sdk是同步操作,所以我们要同步执行,异步获取返回信息
async def tools_select(messages):
"""
数据获取(同步操作)在一个单独的线程中进行,不会阻塞主事件循环。
数据的发送(异步操作)通过异步生成器逐个进行,每当有数据可用时就立即处理和发送。
:param messages:
:return:
"""
# Queue 提供了阻塞(如 get()、put())和非阻塞(如 get_nowait()、put_nowait())的方法来处理队列元素。
# 阻塞操作会等待直到队列中有可用的数据或者有空间来放置新的数据。
q = queue.Queue() # queue.Queue() 是 Python 中的一个标准库,用于在多个线程之间安全地交换信息或数据
# 创建并启动数据获取线程
thread = threading.Thread(target=fetch_data, args=(q, messages))
thread.start()
# 异步处理队列中的数据
while True:
content = await asyncio.to_thread(q.get) # 从队列中获取数据
if content is None: # 使用 None 作为结束信号
break
content = content.replace('\n', '').strip()
content = re.sub(r'\s+', ' ', content)
yield content
thread.join() # 确保线程完成
fetch_data()函数的实现
使用Queue来发送消息,由于需要判断是否调用工具这边又封装了一个函数process_async_generator()
def fetch_data(q, messages):
response = client.chat.completions.create(
model="GLM-3-Turbo",
messages=messages,
tools=tools,
stream=True,
)
for chunk in response:
print(chunk.choices[0].delta)
print(chunk.choices[0].delta.tool_calls)
if chunk.choices[0].delta.tool_calls:
messages.append(chunk.choices[0].delta.model_dump())
# parse_function_call(chunk, messages)
asyncio.run(process_async_generator(q, chunk, messages))
else:
q.put(chunk.choices[0].delta.content)
q.put(None) # 发送结束信号
process_async_generator()函数的实现
该函数主要是用来处理parse_function_call工具调用的功能,同样通过异步操作实现了流式的返回
async def parse_function_call(model_response: ChatCompletionChunk, messages):
# 处理函数调用结果,根据模型返回参数,调用对应的函数。
# 调用函数返回结果后构造tool message,再次调用模型,将函数结果输入模型
# 模型会将函数调用结果以自然语言格式返回给用户。
if model_response.choices[0].delta.tool_calls:
tool_call = model_response.choices[0].delta.tool_calls[0]
args = tool_call.function.arguments
print("args", args)
# str转dict
# args = json.loads(args)
function_result = {}
if tool_call.function.name == "search_score":
function_result = search_score(**json.loads(args))
messages.append({
"role": "system",
"content": f"请把查询出来的所有内容以Markdown的格式都返回给用户,包括学校的联系方式地址等,如果没有返回内容就正常回答",
})
messages.append({
"role": "tool",
"content": f"{json.dumps(function_result)}",
"tool_call_id": tool_call.id
})
response = client.chat.completions.create(
model="GLM-3-Turbo",
# model='glm-4',
messages=messages,
tools=tools,
stream=True,
temperature=0.8
)
for chunk in response:
content = chunk.choices[0].delta.content
yield content
async def process_async_generator(q, chunk, messages):
async for result in parse_function_call(chunk, messages):
q.put(result)