Code - 济南政务文本分类代码拆解(由浅入深5个小例子)

济南政务文本分类代码拆解(由浅入深5个小例子)

例子1:基础铺垫——环境配置+简单工具类(入门级)

核心:掌握环境变量加载、Excel基础操作,搭建工具类雏形(无复杂逻辑,重点是“会用”)

import os
from openpyxl import Workbook
from dotenv import load_dotenv

# 1. 加载环境变量(基础:避免密钥硬编码)
load_dotenv()  # 读取.env文件(需自行创建,写入LLM_API_KEY=你的密钥)
LLM_API_KEY = os.getenv("LLM_API_KEY")

# 2. 简单工具类(基础:Excel保存功能,无API调用、无异步)
class SimpleClassifier:
    def __init__(self):
        # 初始化Excel,写入表头
        self.wb = Workbook()
        self.ws = self.wb.active
        self.ws.title = "分类结果"
        self.headers = ["文本ID", "原文", "分类标签"]
        self.ws.append(self.headers)
        self.text_id = 1  # 自增ID

    # 模拟分类(无真实API,仅手动返回标签,方便测试)
    def simple_classify(self, text):
        # 模拟逻辑:含“政策”→政策通知,含“咨询”→群众咨询,否则→其他
        if "政策" in text:
            label = "政策通知"
        elif "咨询" in text:
            label = "群众咨询"
        else:
            label = "其他"
        # 写入Excel
        self.ws.append([self.text_id, text, label])
        self.text_id += 1
        self.wb.save("简单分类结果.xlsx")  # 保存文件
        return {"text": text, "label": label}

# 测试代码(直接运行可看到效果)
if __name__ == "__main__":
    classifier = SimpleClassifier()
    # 测试2条文本
    res1 = classifier.simple_classify("济南市2024年民生政策通知")
    res2 = classifier.simple_classify("群众咨询社保缴费流程")
    print("分类完成:", res1, res2)

例子2:核心第一步——同步调用大模型API(理解API交互)

核心:去掉异步、批量、定时,只保留“单条文本→调用API→返回结果”,理解API调用逻辑

import requests
import os
from dotenv import load_dotenv

# 1. 环境配置(复用例子1,无需修改)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"  # 替换为真实API地址

# 2. 同步调用API(无异步,简单易懂)
def call_llm_sync(text: str) -> dict:
    """同步调用大模型API,单条文本分类"""
    try:
        # 1. 构造API请求参数(和原代码一致,贴合真实API格式)
        payload = {
            "api_key": LLM_API_KEY,
            "text": text,
            "task_type": "government_text_classify",
            "categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
        }
        # 2. 发送POST请求(同步请求,等待响应)
        response = requests.post(url=LLM_API_URL, json=payload, timeout=30)
        response.raise_for_status()  # 抛出HTTP请求异常(如404、500)
        result = response.json()

        # 3. 校验响应格式,返回结果
        if "category" not in result or "confidence" not in result:
            raise Exception("API返回格式异常")
        return {
            "text": text,
            "category": result["category"],
            "confidence": round(result["confidence"], 4),
            "status": "成功",
            "error_msg": ""
        }
    except Exception as e:
        # 捕获异常,返回错误信息
        return {
            "text": text,
            "category": "",
            "confidence": 0.0,
            "status": "失败",
            "error_msg": str(e)
        }

# 测试代码(直接运行,测试单条文本API调用)
if __name__ == "__main__":
    test_text = "群众投诉小区垃圾清运不及时,影响生活环境"
    result = call_llm_sync(test_text)
    print("API调用结果:", result)

例子3:核心第二步——异步调用API(关键优化,提升效率)

核心:在例子2基础上,将“同步请求”改为“异步请求”,理解asyncio和aiohttp的基础使用

import asyncio
import aiohttp
import os
from dotenv import load_dotenv

# 1. 环境配置(复用之前的,无需修改)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"

# 2. 异步调用API(核心:async/await关键字,aiohttp发起异步请求)
async def call_llm_async(text: str) -> dict:
    """异步调用大模型API,单条文本分类"""
    try:
        # 异步会话复用(减少连接开销,比每次新建会话快)
        async with aiohttp.ClientSession() as session:
            payload = {
                "api_key": LLM_API_KEY,
                "text": text,
                "task_type": "government_text_classify",
                "categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
            }
            # 异步POST请求,await等待响应(不阻塞其他任务)
            async with session.post(url=LLM_API_URL, json=payload, timeout=30) as response:
                result = await response.json()  # 异步解析JSON

                if "category" not in result or "confidence" not in result:
                    raise Exception("API返回格式异常")
                return {
                    "text": text,
                    "category": result["category"],
                    "confidence": round(result["confidence"], 4),
                    "status": "成功",
                    "error_msg": ""
                }
    except Exception as e:
        return {
            "text": text,
            "category": "",
            "confidence": 0.0,
            "status": "失败",
            "error_msg": str(e)
        }

# 3. 批量异步执行(多个文本同时调用API,提升效率)
async def batch_async_classify(texts: list[str]):
    """批量异步分类,同时发起多个API请求"""
    # 把每个文本的API调用,包装成一个异步任务
    tasks = [call_llm_async(text) for text in texts]
    # 批量执行所有任务,等待全部完成(asyncio.gather是核心)
    results = await asyncio.gather(*tasks)
    return results

# 测试代码(异步代码需用asyncio.run()启动)
if __name__ == "__main__":
    # 测试3条文本,异步同时调用API
    test_texts = [
        "济南市2024年民生政策通知",
        "群众咨询社保缴费流程",
        "投诉举报小区垃圾清运不及时"
    ]
    # 启动异步任务,获取结果
    results = asyncio.run(batch_async_classify(test_texts))
    # 打印结果
    for res in results:
        print(res)

例子4:整合功能——异步批量分类+Excel保存(完整核心功能)

核心:整合例子1(Excel保存)和例子3(异步API),实现“批量文本→异步分类→保存到Excel”,接近原代码核心逻辑

import asyncio
import aiohttp
import time
import os
from openpyxl import Workbook
from dotenv import load_dotenv

# 1. 环境配置
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"

# 2. 整合工具类(异步API+Excel保存)
class AsyncClassifier:
    def __init__(self):
        # 初始化Excel(复用例子1的逻辑)
        self.wb = Workbook()
        self.ws = self.wb.active
        self.ws.title = "政务文本分类结果"
        self.headers = ["文本ID", "原文", "分类标签", "置信度", "处理时间", "调用状态", "异常信息"]
        self.ws.append(self.headers)
        self.text_id = 1

    # 异步API调用(复用例子3)
    async def call_llm_async(self, text: str) -> dict:
        try:
            async with aiohttp.ClientSession() as session:
                payload = {
                    "api_key": LLM_API_KEY,
                    "text": text,
                    "task_type": "government_text_classify",
                    "categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
                }
                async with session.post(url=LLM_API_URL, json=payload, timeout=30) as response:
                    result = await response.json()
                    if "category" not in result or "confidence" not in result:
                        raise Exception("API返回格式异常")
                    return {
                        "text": text,
                        "category": result["category"],
                        "confidence": round(result["confidence"], 4),
                        "status": "成功",
                        "error_msg": ""
                    }
        except Exception as e:
            return {
                "text": text,
                "category": "",
                "confidence": 0.0,
                "status": "失败",
                "error_msg": str(e)
            }

    # 批量异步分类+保存结果(核心整合)
    async def batch_classify_and_save(self, texts: list[str], batch_size: int = 10):
        """异步批量分类,结果保存到Excel"""
        start_time = time.time()
        # 并发控制:限制同时发起10个API请求(避免被限流)
        semaphore = asyncio.Semaphore(10)

        # 带并发控制的异步任务(避免请求过多)
        async def bounded_task(text):
            async with semaphore:
                return await self.call_llm_async(text)

        # 批量添加任务(按批次拆分,可选,提升可控性)
        tasks = []
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i+batch_size]
            for text in batch_texts:
                tasks.append(bounded_task(text))

        # 执行任务,获取结果
        results = await asyncio.gather(*tasks)
        # 保存结果到Excel
        self.save_results(results)

        # 打印耗时
        total_time = round(time.time() - start_time, 2)
        print(f"批量分类完成!共{len(texts)}篇,耗时{total_time}秒")
        return results

    # Excel保存功能(复用例子1,优化异常处理)
    def save_results(self, results: list[dict]):
        try:
            for res in results:
                current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                row_data = [
                    self.text_id,
                    res["text"],
                    res["category"],
                    res["confidence"],
                    current_time,
                    res["status"],
                    res["error_msg"]
                ]
                self.ws.append(row_data)
                self.text_id += 1
            self.wb.save("批量分类结果.xlsx")
            print("结果已保存到Excel")
        except Exception as e:
            print(f"保存失败:{str(e)}")
            self.wb.save("分类结果_备份.xlsx")

# 测试代码
if __name__ == "__main__":
    # 测试10条文本(模拟批量场景)
    test_texts = [
        f"政务文本{i}:" + ("政策通知内容" if i%5==0 else "群众咨询内容" if i%5==1 else "投诉举报内容")
        for i in range(1, 11)
    ]
    # 初始化工具类,执行批量分类
    classifier = AsyncClassifier()
    asyncio.run(classifier.batch_classify_and_save(test_texts, batch_size=5))

例子5:最终整合——FastAPI接口+定时任务(完整功能,接近原代码)

核心:在例子4基础上,添加FastAPI接口(供外部调用)和定时任务(自动执行),实现原代码全部功能

import asyncio
import aiohttp
import time
import os
from openpyxl import Workbook, load_workbook
from apscheduler.schedulers.blocking import BlockingScheduler
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from dotenv import load_dotenv

# 1. 环境配置
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"

# 2. 实例化FastAPI(简单配置)
app = FastAPI(title="政务文本分类工具", version="1.0.0")

# 3. 定义请求体模型(接收前端传入的文本列表)
class TextList(BaseModel):
    texts: list[str]  # 必填:文本列表
    batch_size: int = 10  # 可选:批次大小,默认10

# 4. 整合工具类(复用例子4的核心逻辑)
class GovernmentTextClassifier:
    def __init__(self):
        self.wb = Workbook()
        self.ws = self.wb.active
        self.ws.title = "政务文本分类结果"
        self.headers = ["文本ID", "原文", "分类标签", "置信度", "处理时间", "调用状态", "异常信息"]
        self.ws.append(self.headers)
        self.text_id = 1

    async def call_llm_async(self, text: str) -> dict:
        try:
            async with aiohttp.ClientSession() as session:
                payload = {
                    "api_key": LLM_API_KEY,
                    "text": text,
                    "task_type": "government_text_classify",
                    "categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
                }
                async with session.post(url=LLM_API_URL, json=payload, timeout=30) as response:
                    result = await response.json()
                    if "category" not in result or "confidence" not in result:
                        raise Exception("API返回格式异常")
                    return {
                        "text": text,
                        "category": result["category"],
                        "confidence": round(result["confidence"], 4),
                        "status": "成功",
                        "error_msg": ""
                    }
        except Exception as e:
            return {
                "text": text,
                "category": "",
                "confidence": 0.0,
                "status": "失败",
                "error_msg": str(e)
            }

    async def batch_classify(self, texts: list[str], batch_size: int = 10):
        start_time = time.time()
        semaphore = asyncio.Semaphore(10)
        async def bounded_task(text):
            async with semaphore:
                return await self.call_llm_async(text)
        tasks = []
        for i in range(0, len(texts), batch_size):
            for text in texts[i:i+batch_size]:
                tasks.append(bounded_task(text))
        results = await asyncio.gather(*tasks)
        self.save_results(results)
        total_time = round(time.time() - start_time, 2)
        print(f"批量分类完成!共{len(texts)}篇,耗时{total_time}秒")
        return results

    def save_results(self, results: list[dict]):
        try:
            for res in results:
                current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                self.ws.append([
                    self.text_id, res["text"], res["category"], res["confidence"],
                    current_time, res["status"], res["error_msg"]
                ])
                self.text_id += 1
            self.wb.save("济南政务文本分类结果.xlsx")
        except Exception as e:
            print(f"保存失败:{str(e)}")
            self.wb.save("济南政务文本分类结果_备份.xlsx")

# 5. 实例化工具类(全局可用)
classifier = GovernmentTextClassifier()

# 6. FastAPI接口(批量分类接口,POST请求)
@app.post("/batch-classify")
async def batch_classify_api(text_list: TextList):
    if not text_list.texts:
        raise HTTPException(status_code=400, detail="文本列表不可为空")
    results = await classifier.batch_classify(text_list.texts, text_list.batch_size)
    # 计算准确率(可选,和原代码一致)
    success = [res for res in results if res["status"] == "成功" and res["confidence"] >= 0.92]
    accuracy = round(len(success)/len(results), 4) if results else 0.0
    return {
        "code": 200,
        "message": "分类成功",
        "data": {
            "总文本数": len(results),
            "达标文本数": len(success),
            "准确率": accuracy,
            "详细结果": results
        }
    }

# 7. 定时任务接口(启动定时分类)
@app.get("/start-scheduler")
def start_scheduler(cron_time: str = Query(default="0 9 * * *", description="Cron表达式,默认每天9点")):
    try:
        scheduler = BlockingScheduler()
        # 定时任务逻辑:读取待分类文本,执行分类
        def scheduled_task():
            print(f"定时任务执行:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
            try:
                # 读取待分类Excel(需自行创建“待分类政务文本.xlsx”,A列存文本)
                wb = load_workbook("待分类政务文本.xlsx")
                ws = wb.active
                texts = [cell.value for cell in ws["A"][1:] if cell.value]  # 跳过表头,过滤空值
                if texts:
                    asyncio.run(classifier.batch_classify(texts))
                else:
                    print("无待分类文本")
            except Exception as e:
                print(f"定时任务失败:{str(e)}")

        # 添加定时任务
        scheduler.add_job(
            func=scheduled_task,
            trigger="cron",
            cron_expression=cron_time,
            id="gov_text_classify",
            replace_existing=True
        )
        print(f"定时任务启动,执行时间:{cron_time}")
        scheduler.start()
        return {"code": 200, "message": "定时任务启动成功"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"启动失败:{str(e)}")

# 8. 启动FastAPI服务
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app="__main__:app", host="0.0.0.0", port=8000, reload=True)

拆解说明(必看)

  • 学习顺序:例子1 → 例子2 → 例子3 → 例子4 → 例子5,每一步只新增1-2个知识点,避免混乱
  • 核心递进:基础配置 → 同步API → 异步API → 异步+Excel → 完整功能(接口+定时)
  • 简化点:去掉原代码中冗余的注释、复杂的异常捕获,保留核心逻辑;每个例子均可独立运行,方便测试
  • 补充:运行前需创建.env文件(存LLM_API_KEY),替换真实API地址;测试定时任务时,需创建“待分类政务文本.xlsx”

(注:文档部分内容可能由 AI 生成)

添加新评论