Code - 异步批量分类+Excel保存

import asyncio
import aiohttp
import time
import os
from anyio import Semaphore
from openpyxl import Workbook
from dotenv import load_dotenv

1. 环境配置(复用之前的,无需修改)

load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
if not LLM_API_KEY:

raise ValueError("❌ 未设置 LLM_API_KEY,请检查 .env 文件")

LLM_API_URL = (

"https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"

)

2. 整合工具类(异步API+Excel保存)

class AsyncClassifier:

def __init__(self):
    self.wb = Workbook()
    self.ws = self.wb.active
    self.ws.title = "政务文本分类结果"
    self.headers = [
        "文本ID",
        "原文",
        "分类标签",
        "置信度",
        "处理时间",
        "调用状态",
        "异常信息",
    ]
    self.ws.append(self.headers)
    self.text_id = 1

# 异步API调用(复用例子3)
async def call_llm_async(self, text: str) -> dict:
    try:
        async with aiohttp.ClientSession() as session:
            categories = ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
            prompt = (
                f"你是一个政务文本分类器。请将以下文本严格分类为:{', '.join(categories)}。\n"
                f"只输出类别名称,不要任何解释或标点。\n\n"
                f"文本:{text}\n"
                f"类别:"
            )
            # 构造请求头和请求体
            headers = {
                "Authorization": f"Bearer {LLM_API_KEY}",
                "Content-Type": "application/json",
            }

            payload = {
                "model": "qwen-turbo-latest",  # 或 qwen-max / qwen-plus
                "input": {"messages": [{"role": "user", "content": prompt}]},
                "parameters": {
                    "max_tokens": 10,
                    "temperature": 0.1,  # 降低随机性,提高稳定性
                    "seed": 12345,  # 可选:固定种子
                },
            }

            async with session.post(
                url=LLM_API_URL, headers=headers, json=payload, timeout=30
            ) as reponse:
                result = await reponse.json()
                # print(result, "\n")
                # 提取模型输出
                output_text = result.get("output", {}).get("text", "").strip()

                # 简单置信度模拟(实际无法获取真实 confidence,可设为 1.0 或基于规则)
                confidence = 1.0 if output_text in categories else 0.0

                return {
                    "text": text,
                    "category": output_text if confidence > 0 else "",
                    "confidence": confidence,
                    "status": "成功" if confidence > 0 else "失败",
                    "error_msg": (
                        ""
                        if confidence > 0
                        else f"模型返回无效类别: '{output_text}'"
                    ),
                }
    except Exception as e:
        return {
            "text": text,
            "category": "",
            "confidence": 0.0,
            "status": "失败",
            "error_msg": str(e),
        }

# 批量异步分类+保存结果(核心整合)
async def batch_classify_and_save(self, texts: list[str], batch_size: int = 10):
    """异步批量分类,结果保存到Excel"""
    start_time = time.time()
    # 并发控制:限制同时发起10个API请求(避免被限流)
    semaphore = asyncio.Semaphore(10)

    # 带并发控制的异步任务(避免请求过多)
    async def bounded_task(text):
        async with semaphore:
            return await self.call_llm_async(text)

    # 批量添加任务(按批次拆分,可选,提升可控性)
    tasks = []
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i : i + batch_size]
        for text in batch_texts:
            tasks.append(bounded_task(text))

    # 执行任务,获取结果
    results = await asyncio.gather(*tasks)
    # 保存结果到Excel
    self.save_results(results)
    total_time = round(time.time() - start_time, 2)
    print(f"批量分类完成!共{len(texts)}篇,耗时{total_time}秒")
    return results

# Excel保存功能(复用例子1,优化异常处理)
def save_results(self, results: list[dict]):
    try:
        for res in results:
            current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            row_data = [
                self.text_id,
                res["text"],
                res["category"],
                res["confidence"],
                current_time,
                res["status"],
                res["error_msg"],
            ]
            self.ws.append(row_data)
            self.text_id += 1
        self.wb.save("批量分类结果.xlsx")
        print("结果已保存到Excel")
    except Exception as e:
        print(f"保存失败:{str(e)}")
        self.wb.save("分类结果_备份.xlsx")

测试代码

if name == "__main__":

# 测试10条文本(模拟批量场景)
test_texts = [
    f"政务文本{i}:"
    + (
        "政策通知内容"
        if i % 5 == 0
        else "群众咨询内容" if i % 5 == 1 else "投诉举报内容"
    )
    for i in range(1, 11)
]
# 初始化工具类,执行批量分类
classifier = AsyncClassifier()
asyncio.run(classifier.batch_classify_and_save(test_texts, batch_size=5))
添加新评论