from csv import Error
from fastapi import FastAPI, Query, Body, HTTPException
from pydantic import BaseModel, Field
import requests
import json
import time
import os
from openai import OpenAI
from openai import OpenAI, APIError, APIConnectionError, RateLimitError
import uvicorn
app = FastAPI(
title="千问API调用",
description="调用阿里云qwen-turbo-latest模型实现文本摘要功能",
version="1.0.0",)
初始化openAI客户端
client = OpenAI(
api_key="sk-1308279658e64f2180e380c0cda8302e",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",)
定义响应体数据模型(结构化返回)
class SummaryResponse(BaseModel):
code: int = Field(200, description="响应状态码,200为成功")
msg: str = Field("success", description="响应信息")
data: dict = Field(..., description="摘要结果及模型原始信息")
定义请求体数据模型(自动校验请求参数)
class SummaryRequest(BaseModel):
content: str = Field(
..., min_length=10, description="需要摘要的文本内容,长度不少于10个字符"
)
prompt: str = Field(
default="请摘要以下文本:",
description="自定义摘要提示词,默认为'请摘要以下文本:'",
)
@app.post("/api/summary", response_model=SummaryResponse, summary="文本摘要接口")
async def text_summary(request: SummaryRequest):
try:
# 调用百炼大模型
completion = client.chat.completions.create(
model="qwen-turbo-latest", # 阿里云百炼轻量版模型
messages=[
{"role": "user", "content": f"{request.prompt}{request.content}"}
],
temperature=0.3, # 摘要任务建议低温度,保证结果稳定
max_tokens=500, # 限制生成文本长度,避免冗余
)
# 解析模型返回结果
summary_result = completion.choices[0].message.content.strip()
# 构造返回数据
return {
"code": 200,
"msg": "success",
"data": {
"summary": summary_result, # 提取后的纯摘要内容
"model": completion.model, # 所用模型名称
"finish_reason": completion.choices[0].finish_reason, # 完成原因
},
}
# 捕获API调用错误(密钥无效、接口不可达等)
except Error as e:
raise HTTPException(status_code=500, detail=f"大模型API调用失败:{str(e)}")
根路径健康检查
@app.get("/", summary="服务健康检查")
async def health_check():
return {"status": "running", "service": "百炼文本摘要API"}
本地运行入口
if name == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)