import asyncio
import aiohttp
import time
import os
from anyio import Semaphore
from openpyxl import Workbook
from dotenv import load_dotenv
1. 环境配置(复用之前的,无需修改)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:
raise ValueError("❌ 未设置 LLM_API_KEY,请检查 .env 文件")
LLM_API_URL = (
"https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation")
2. 整合工具类(异步API+Excel保存)
class AsyncClassifier:
def __init__(self):
self.wb = Workbook()
self.ws = self.wb.active
self.ws.title = "政务文本分类结果"
self.headers = [
"文本ID",
"原文",
"分类标签",
"置信度",
"处理时间",
"调用状态",
"异常信息",
]
self.ws.append(self.headers)
self.text_id = 1
# 异步API调用(复用例子3)
async def call_llm_async(self, text: str) -> dict:
try:
async with aiohttp.ClientSession() as session:
categories = ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
prompt = (
f"你是一个政务文本分类器。请将以下文本严格分类为:{', '.join(categories)}。\n"
f"只输出类别名称,不要任何解释或标点。\n\n"
f"文本:{text}\n"
f"类别:"
)
# 构造请求头和请求体
headers = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": "qwen-turbo-latest", # 或 qwen-max / qwen-plus
"input": {"messages": [{"role": "user", "content": prompt}]},
"parameters": {
"max_tokens": 10,
"temperature": 0.1, # 降低随机性,提高稳定性
"seed": 12345, # 可选:固定种子
},
}
async with session.post(
url=LLM_API_URL, headers=headers, json=payload, timeout=30
) as reponse:
result = await reponse.json()
# print(result, "\n")
# 提取模型输出
output_text = result.get("output", {}).get("text", "").strip()
# 简单置信度模拟(实际无法获取真实 confidence,可设为 1.0 或基于规则)
confidence = 1.0 if output_text in categories else 0.0
return {
"text": text,
"category": output_text if confidence > 0 else "",
"confidence": confidence,
"status": "成功" if confidence > 0 else "失败",
"error_msg": (
""
if confidence > 0
else f"模型返回无效类别: '{output_text}'"
),
}
except Exception as e:
return {
"text": text,
"category": "",
"confidence": 0.0,
"status": "失败",
"error_msg": str(e),
}
# 批量异步分类+保存结果(核心整合)
async def batch_classify_and_save(self, texts: list[str], batch_size: int = 10):
"""异步批量分类,结果保存到Excel"""
start_time = time.time()
# 并发控制:限制同时发起10个API请求(避免被限流)
semaphore = asyncio.Semaphore(10)
# 带并发控制的异步任务(避免请求过多)
async def bounded_task(text):
async with semaphore:
return await self.call_llm_async(text)
# 批量添加任务(按批次拆分,可选,提升可控性)
tasks = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i : i + batch_size]
for text in batch_texts:
tasks.append(bounded_task(text))
# 执行任务,获取结果
results = await asyncio.gather(*tasks)
# 保存结果到Excel
self.save_results(results)
total_time = round(time.time() - start_time, 2)
print(f"批量分类完成!共{len(texts)}篇,耗时{total_time}秒")
return results
# Excel保存功能(复用例子1,优化异常处理)
def save_results(self, results: list[dict]):
try:
for res in results:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
row_data = [
self.text_id,
res["text"],
res["category"],
res["confidence"],
current_time,
res["status"],
res["error_msg"],
]
self.ws.append(row_data)
self.text_id += 1
self.wb.save("批量分类结果.xlsx")
print("结果已保存到Excel")
except Exception as e:
print(f"保存失败:{str(e)}")
self.wb.save("分类结果_备份.xlsx")
测试代码
if name == "__main__":
# 测试10条文本(模拟批量场景)
test_texts = [
f"政务文本{i}:"
+ (
"政策通知内容"
if i % 5 == 0
else "群众咨询内容" if i % 5 == 1 else "投诉举报内容"
)
for i in range(1, 11)
]
# 初始化工具类,执行批量分类
classifier = AsyncClassifier()
asyncio.run(classifier.batch_classify_and_save(test_texts, batch_size=5))