济南政务文本分类代码拆解(由浅入深5个小例子)
例子1:基础铺垫——环境配置+简单工具类(入门级)
核心:掌握环境变量加载、Excel基础操作,搭建工具类雏形(无复杂逻辑,重点是“会用”)
import os
from openpyxl import Workbook
from dotenv import load_dotenv
# 1. 加载环境变量(基础:避免密钥硬编码)
load_dotenv() # 读取.env文件(需自行创建,写入LLM_API_KEY=你的密钥)
LLM_API_KEY = os.getenv("LLM_API_KEY")
# 2. 简单工具类(基础:Excel保存功能,无API调用、无异步)
class SimpleClassifier:
def __init__(self):
# 初始化Excel,写入表头
self.wb = Workbook()
self.ws = self.wb.active
self.ws.title = "分类结果"
self.headers = ["文本ID", "原文", "分类标签"]
self.ws.append(self.headers)
self.text_id = 1 # 自增ID
# 模拟分类(无真实API,仅手动返回标签,方便测试)
def simple_classify(self, text):
# 模拟逻辑:含“政策”→政策通知,含“咨询”→群众咨询,否则→其他
if "政策" in text:
label = "政策通知"
elif "咨询" in text:
label = "群众咨询"
else:
label = "其他"
# 写入Excel
self.ws.append([self.text_id, text, label])
self.text_id += 1
self.wb.save("简单分类结果.xlsx") # 保存文件
return {"text": text, "label": label}
# 测试代码(直接运行可看到效果)
if __name__ == "__main__":
classifier = SimpleClassifier()
# 测试2条文本
res1 = classifier.simple_classify("济南市2024年民生政策通知")
res2 = classifier.simple_classify("群众咨询社保缴费流程")
print("分类完成:", res1, res2)例子2:核心第一步——同步调用大模型API(理解API交互)
核心:去掉异步、批量、定时,只保留“单条文本→调用API→返回结果”,理解API调用逻辑
import requests
import os
from dotenv import load_dotenv
# 1. 环境配置(复用例子1,无需修改)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify" # 替换为真实API地址
# 2. 同步调用API(无异步,简单易懂)
def call_llm_sync(text: str) -> dict:
"""同步调用大模型API,单条文本分类"""
try:
# 1. 构造API请求参数(和原代码一致,贴合真实API格式)
payload = {
"api_key": LLM_API_KEY,
"text": text,
"task_type": "government_text_classify",
"categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
}
# 2. 发送POST请求(同步请求,等待响应)
response = requests.post(url=LLM_API_URL, json=payload, timeout=30)
response.raise_for_status() # 抛出HTTP请求异常(如404、500)
result = response.json()
# 3. 校验响应格式,返回结果
if "category" not in result or "confidence" not in result:
raise Exception("API返回格式异常")
return {
"text": text,
"category": result["category"],
"confidence": round(result["confidence"], 4),
"status": "成功",
"error_msg": ""
}
except Exception as e:
# 捕获异常,返回错误信息
return {
"text": text,
"category": "",
"confidence": 0.0,
"status": "失败",
"error_msg": str(e)
}
# 测试代码(直接运行,测试单条文本API调用)
if __name__ == "__main__":
test_text = "群众投诉小区垃圾清运不及时,影响生活环境"
result = call_llm_sync(test_text)
print("API调用结果:", result)例子3:核心第二步——异步调用API(关键优化,提升效率)
核心:在例子2基础上,将“同步请求”改为“异步请求”,理解asyncio和aiohttp的基础使用
import asyncio
import aiohttp
import os
from dotenv import load_dotenv
# 1. 环境配置(复用之前的,无需修改)
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"
# 2. 异步调用API(核心:async/await关键字,aiohttp发起异步请求)
async def call_llm_async(text: str) -> dict:
"""异步调用大模型API,单条文本分类"""
try:
# 异步会话复用(减少连接开销,比每次新建会话快)
async with aiohttp.ClientSession() as session:
payload = {
"api_key": LLM_API_KEY,
"text": text,
"task_type": "government_text_classify",
"categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
}
# 异步POST请求,await等待响应(不阻塞其他任务)
async with session.post(url=LLM_API_URL, json=payload, timeout=30) as response:
result = await response.json() # 异步解析JSON
if "category" not in result or "confidence" not in result:
raise Exception("API返回格式异常")
return {
"text": text,
"category": result["category"],
"confidence": round(result["confidence"], 4),
"status": "成功",
"error_msg": ""
}
except Exception as e:
return {
"text": text,
"category": "",
"confidence": 0.0,
"status": "失败",
"error_msg": str(e)
}
# 3. 批量异步执行(多个文本同时调用API,提升效率)
async def batch_async_classify(texts: list[str]):
"""批量异步分类,同时发起多个API请求"""
# 把每个文本的API调用,包装成一个异步任务
tasks = [call_llm_async(text) for text in texts]
# 批量执行所有任务,等待全部完成(asyncio.gather是核心)
results = await asyncio.gather(*tasks)
return results
# 测试代码(异步代码需用asyncio.run()启动)
if __name__ == "__main__":
# 测试3条文本,异步同时调用API
test_texts = [
"济南市2024年民生政策通知",
"群众咨询社保缴费流程",
"投诉举报小区垃圾清运不及时"
]
# 启动异步任务,获取结果
results = asyncio.run(batch_async_classify(test_texts))
# 打印结果
for res in results:
print(res)例子4:整合功能——异步批量分类+Excel保存(完整核心功能)
核心:整合例子1(Excel保存)和例子3(异步API),实现“批量文本→异步分类→保存到Excel”,接近原代码核心逻辑
import asyncio
import aiohttp
import time
import os
from openpyxl import Workbook
from dotenv import load_dotenv
# 1. 环境配置
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"
# 2. 整合工具类(异步API+Excel保存)
class AsyncClassifier:
def __init__(self):
# 初始化Excel(复用例子1的逻辑)
self.wb = Workbook()
self.ws = self.wb.active
self.ws.title = "政务文本分类结果"
self.headers = ["文本ID", "原文", "分类标签", "置信度", "处理时间", "调用状态", "异常信息"]
self.ws.append(self.headers)
self.text_id = 1
# 异步API调用(复用例子3)
async def call_llm_async(self, text: str) -> dict:
try:
async with aiohttp.ClientSession() as session:
payload = {
"api_key": LLM_API_KEY,
"text": text,
"task_type": "government_text_classify",
"categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
}
async with session.post(url=LLM_API_URL, json=payload, timeout=30) as response:
result = await response.json()
if "category" not in result or "confidence" not in result:
raise Exception("API返回格式异常")
return {
"text": text,
"category": result["category"],
"confidence": round(result["confidence"], 4),
"status": "成功",
"error_msg": ""
}
except Exception as e:
return {
"text": text,
"category": "",
"confidence": 0.0,
"status": "失败",
"error_msg": str(e)
}
# 批量异步分类+保存结果(核心整合)
async def batch_classify_and_save(self, texts: list[str], batch_size: int = 10):
"""异步批量分类,结果保存到Excel"""
start_time = time.time()
# 并发控制:限制同时发起10个API请求(避免被限流)
semaphore = asyncio.Semaphore(10)
# 带并发控制的异步任务(避免请求过多)
async def bounded_task(text):
async with semaphore:
return await self.call_llm_async(text)
# 批量添加任务(按批次拆分,可选,提升可控性)
tasks = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
for text in batch_texts:
tasks.append(bounded_task(text))
# 执行任务,获取结果
results = await asyncio.gather(*tasks)
# 保存结果到Excel
self.save_results(results)
# 打印耗时
total_time = round(time.time() - start_time, 2)
print(f"批量分类完成!共{len(texts)}篇,耗时{total_time}秒")
return results
# Excel保存功能(复用例子1,优化异常处理)
def save_results(self, results: list[dict]):
try:
for res in results:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
row_data = [
self.text_id,
res["text"],
res["category"],
res["confidence"],
current_time,
res["status"],
res["error_msg"]
]
self.ws.append(row_data)
self.text_id += 1
self.wb.save("批量分类结果.xlsx")
print("结果已保存到Excel")
except Exception as e:
print(f"保存失败:{str(e)}")
self.wb.save("分类结果_备份.xlsx")
# 测试代码
if __name__ == "__main__":
# 测试10条文本(模拟批量场景)
test_texts = [
f"政务文本{i}:" + ("政策通知内容" if i%5==0 else "群众咨询内容" if i%5==1 else "投诉举报内容")
for i in range(1, 11)
]
# 初始化工具类,执行批量分类
classifier = AsyncClassifier()
asyncio.run(classifier.batch_classify_and_save(test_texts, batch_size=5))例子5:最终整合——FastAPI接口+定时任务(完整功能,接近原代码)
核心:在例子4基础上,添加FastAPI接口(供外部调用)和定时任务(自动执行),实现原代码全部功能
import asyncio
import aiohttp
import time
import os
from openpyxl import Workbook, load_workbook
from apscheduler.schedulers.blocking import BlockingScheduler
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from dotenv import load_dotenv
# 1. 环境配置
load_dotenv()
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_API_URL = "https://api.example.com/classify"
# 2. 实例化FastAPI(简单配置)
app = FastAPI(title="政务文本分类工具", version="1.0.0")
# 3. 定义请求体模型(接收前端传入的文本列表)
class TextList(BaseModel):
texts: list[str] # 必填:文本列表
batch_size: int = 10 # 可选:批次大小,默认10
# 4. 整合工具类(复用例子4的核心逻辑)
class GovernmentTextClassifier:
def __init__(self):
self.wb = Workbook()
self.ws = self.wb.active
self.ws.title = "政务文本分类结果"
self.headers = ["文本ID", "原文", "分类标签", "置信度", "处理时间", "调用状态", "异常信息"]
self.ws.append(self.headers)
self.text_id = 1
async def call_llm_async(self, text: str) -> dict:
try:
async with aiohttp.ClientSession() as session:
payload = {
"api_key": LLM_API_KEY,
"text": text,
"task_type": "government_text_classify",
"categories": ["政策通知", "群众咨询", "投诉举报", "政务公告", "其他"]
}
async with session.post(url=LLM_API_URL, json=payload, timeout=30) as response:
result = await response.json()
if "category" not in result or "confidence" not in result:
raise Exception("API返回格式异常")
return {
"text": text,
"category": result["category"],
"confidence": round(result["confidence"], 4),
"status": "成功",
"error_msg": ""
}
except Exception as e:
return {
"text": text,
"category": "",
"confidence": 0.0,
"status": "失败",
"error_msg": str(e)
}
async def batch_classify(self, texts: list[str], batch_size: int = 10):
start_time = time.time()
semaphore = asyncio.Semaphore(10)
async def bounded_task(text):
async with semaphore:
return await self.call_llm_async(text)
tasks = []
for i in range(0, len(texts), batch_size):
for text in texts[i:i+batch_size]:
tasks.append(bounded_task(text))
results = await asyncio.gather(*tasks)
self.save_results(results)
total_time = round(time.time() - start_time, 2)
print(f"批量分类完成!共{len(texts)}篇,耗时{total_time}秒")
return results
def save_results(self, results: list[dict]):
try:
for res in results:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.ws.append([
self.text_id, res["text"], res["category"], res["confidence"],
current_time, res["status"], res["error_msg"]
])
self.text_id += 1
self.wb.save("济南政务文本分类结果.xlsx")
except Exception as e:
print(f"保存失败:{str(e)}")
self.wb.save("济南政务文本分类结果_备份.xlsx")
# 5. 实例化工具类(全局可用)
classifier = GovernmentTextClassifier()
# 6. FastAPI接口(批量分类接口,POST请求)
@app.post("/batch-classify")
async def batch_classify_api(text_list: TextList):
if not text_list.texts:
raise HTTPException(status_code=400, detail="文本列表不可为空")
results = await classifier.batch_classify(text_list.texts, text_list.batch_size)
# 计算准确率(可选,和原代码一致)
success = [res for res in results if res["status"] == "成功" and res["confidence"] >= 0.92]
accuracy = round(len(success)/len(results), 4) if results else 0.0
return {
"code": 200,
"message": "分类成功",
"data": {
"总文本数": len(results),
"达标文本数": len(success),
"准确率": accuracy,
"详细结果": results
}
}
# 7. 定时任务接口(启动定时分类)
@app.get("/start-scheduler")
def start_scheduler(cron_time: str = Query(default="0 9 * * *", description="Cron表达式,默认每天9点")):
try:
scheduler = BlockingScheduler()
# 定时任务逻辑:读取待分类文本,执行分类
def scheduled_task():
print(f"定时任务执行:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
try:
# 读取待分类Excel(需自行创建“待分类政务文本.xlsx”,A列存文本)
wb = load_workbook("待分类政务文本.xlsx")
ws = wb.active
texts = [cell.value for cell in ws["A"][1:] if cell.value] # 跳过表头,过滤空值
if texts:
asyncio.run(classifier.batch_classify(texts))
else:
print("无待分类文本")
except Exception as e:
print(f"定时任务失败:{str(e)}")
# 添加定时任务
scheduler.add_job(
func=scheduled_task,
trigger="cron",
cron_expression=cron_time,
id="gov_text_classify",
replace_existing=True
)
print(f"定时任务启动,执行时间:{cron_time}")
scheduler.start()
return {"code": 200, "message": "定时任务启动成功"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"启动失败:{str(e)}")
# 8. 启动FastAPI服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(app="__main__:app", host="0.0.0.0", port=8000, reload=True)拆解说明(必看)
- 学习顺序:例子1 → 例子2 → 例子3 → 例子4 → 例子5,每一步只新增1-2个知识点,避免混乱
- 核心递进:基础配置 → 同步API → 异步API → 异步+Excel → 完整功能(接口+定时)
- 简化点:去掉原代码中冗余的注释、复杂的异常捕获,保留核心逻辑;每个例子均可独立运行,方便测试
- 补充:运行前需创建.env文件(存LLM_API_KEY),替换真实API地址;测试定时任务时,需创建“待分类政务文本.xlsx”
(注:文档部分内容可能由 AI 生成)