异步调用API(关键优化,提升效率)
import asyncio
from urllib import response
import aiohttp
import os
from dotenv import load_dotenv
1. 环境配置(复用之前的,无需修改)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
raise ValueError("❌ 未设置 LLM_API_KEY,请检查 .env 文件")
LLM_API_URL = (
"https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation")
async def call_llm_async(text: str):
try:
"""异步调用大模型API,单条文本分类"""
async with aiohttp.ClientSession() as session:
# 构造 Prompt(关键:让模型知道要做什么)
categories = ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
prompt = (
f"你是一个政务文本分类器。请将以下文本严格分类为:{', '.join(categories)}。\n"
f"只输出类别名称,不要任何解释或标点。\n\n"
f"文本:{text}\n"
f"类别:"
)
# 构造请求头和请求体
headers = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": "qwen-turbo-latest", # 或 qwen-max / qwen-plus
"input": {"messages": [{"role": "user", "content": prompt}]},
"parameters": {
"max_tokens": 10,
"temperature": 0.1, # 降低随机性,提高稳定性
"seed": 12345, # 可选:固定种子
},
}
async with session.post(
url=LLM_API_URL, headers=headers, json=payload, timeout=30
) as reponse:
result = await reponse.json()
# print(result, "\n")
# 提取模型输出
output_text = result.get("output", {}).get("text", "").strip()
# 简单置信度模拟(实际无法获取真实 confidence,可设为 1.0 或基于规则)
confidence = 1.0 if output_text in categories else 0.0
return {
"text": text,
"category": output_text if confidence > 0 else "",
"confidence": confidence,
"status": "成功" if confidence > 0 else "失败",
"error_msg": (
"" if confidence > 0 else f"模型返回无效类别: '{output_text}'"
),
}
except Exception as e:
return {
"text": text,
"category": "",
"confidence": 0.0,
"status": "失败",
"error_msg": str(e),
}
async def batch_async_classify(texts: list[str]):
"""批量异步分类,同时发起多个API请求"""
tasks = [call_llm_async(text) for text in texts]
# 批量执行所有任务,等待全部完成(asyncio.gather是核心)
results = await asyncio.gather(*tasks)
return results
if name == "__main__":
# 测试3条文本,异步同时调用API
test_texts = [
"济南市2024年民生政策通知",
"群众咨询社保缴费流程",
"投诉举报小区垃圾清运不及时",
]
# 启动异步任务,获取结果
results = asyncio.run(batch_async_classify(test_texts))
# 打印结果
for res in results:
print(res)