Code - aiohttp 异步调用API(关键优化,提升效率)

异步调用API(关键优化,提升效率)

import asyncio
from urllib import response
import aiohttp
import os
from dotenv import load_dotenv

1. 环境配置(复用之前的,无需修改)

load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:

raise ValueError("❌ 未设置 LLM_API_KEY,请检查 .env 文件")

LLM_API_URL = (

"https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"

)

async def call_llm_async(text: str):

try:
    """异步调用大模型API,单条文本分类"""
    async with aiohttp.ClientSession() as session:
        # 构造 Prompt(关键:让模型知道要做什么)
        categories = ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
        prompt = (
            f"你是一个政务文本分类器。请将以下文本严格分类为:{', '.join(categories)}。\n"
            f"只输出类别名称,不要任何解释或标点。\n\n"
            f"文本:{text}\n"
            f"类别:"
        )

        # 构造请求头和请求体
        headers = {
            "Authorization": f"Bearer {LLM_API_KEY}",
            "Content-Type": "application/json",
        }

        payload = {
            "model": "qwen-turbo-latest",  # 或 qwen-max / qwen-plus
            "input": {"messages": [{"role": "user", "content": prompt}]},
            "parameters": {
                "max_tokens": 10,
                "temperature": 0.1,  # 降低随机性,提高稳定性
                "seed": 12345,  # 可选:固定种子
            },
        }

        async with session.post(
            url=LLM_API_URL, headers=headers, json=payload, timeout=30
        ) as reponse:
            result = await reponse.json()
            # print(result, "\n")
            # 提取模型输出
            output_text = result.get("output", {}).get("text", "").strip()

            # 简单置信度模拟(实际无法获取真实 confidence,可设为 1.0 或基于规则)
            confidence = 1.0 if output_text in categories else 0.0

            return {
                "text": text,
                "category": output_text if confidence > 0 else "",
                "confidence": confidence,
                "status": "成功" if confidence > 0 else "失败",
                "error_msg": (
                    "" if confidence > 0 else f"模型返回无效类别: '{output_text}'"
                ),
            }
except Exception as e:
    return {
        "text": text,
        "category": "",
        "confidence": 0.0,
        "status": "失败",
        "error_msg": str(e),
    }

async def batch_async_classify(texts: list[str]):

"""批量异步分类,同时发起多个API请求"""
tasks = [call_llm_async(text) for text in texts]
# 批量执行所有任务,等待全部完成(asyncio.gather是核心)
results = await asyncio.gather(*tasks)
return results

if name == "__main__":

# 测试3条文本,异步同时调用API
test_texts = [
    "济南市2024年民生政策通知",
    "群众咨询社保缴费流程",
    "投诉举报小区垃圾清运不及时",
]

# 启动异步任务,获取结果
results = asyncio.run(batch_async_classify(test_texts))
# 打印结果
for res in results:
    print(res)
添加新评论