aiohttp 从基础到进阶学习指南(适配 FastAPI 异步开发)
适用人群:已掌握 Python 基础、FastAPI 路由基本用法,无 asyncio 实战经验,计划用 FastAPI + aiohttp 进行异步 Web 开发的自学开发者。
核心定位:aiohttp 是 Python 主流异步 HTTP 客户端/服务器框架,本文重点聚焦 aiohttp 客户端(适配 FastAPI 路由中调用第三方 API 的场景),全程贴合 FastAPI 异步开发最佳实践,规避实战陷阱。
基础篇:async/await 核心概念 + aiohttp 客户端最简用法
一、async/await 核心原理(极简版)
原理说明
Python 异步编程的核心是“非阻塞等待”,解决 CPU 等待 I/O(如 HTTP 请求、数据库操作)时的资源浪费问题,关键语法就是 async 和await:
async def:定义“异步函数”(协程函数),该函数调用后不会立即执行,而是返回一个“协程对象”(coroutine)。await:只能用在异步函数内部,用于“等待”另一个协程完成(如等待 HTTP 请求响应),等待期间 CPU 可执行其他任务,实现“并发”。- 补充:异步函数不能直接调用执行(如
func()无效),必须通过asyncio.run(func())(Python 3.7+)启动,或交给事件循环(event loop)调度。
极简示例(先理解语法,再结合 aiohttp)
import asyncio
# 1. 用 async def 定义异步函数(协程函数)
async def async_task():
print("异步任务开始")
# 模拟 I/O 等待(如 HTTP 请求、数据库查询),用 asyncio.sleep 替代
# 注意:不能用 time.sleep(),它是阻塞的,会卡住整个事件循环
await asyncio.sleep(1) # 等待 1 秒,期间 CPU 可做其他事
print("异步任务结束")
# 2. 用 asyncio.run() 启动异步函数(Python 3.7+ 推荐写法)
if __name__ == "__main__":
asyncio.run(async_task()) # 自动创建事件循环、运行协程、关闭循环
常见陷阱
- 陷阱1:在异步函数内用阻塞函数(如
time.sleep()、requests.get()),会导致整个事件循环卡住,所有异步任务都无法执行。 - 陷阱2:直接调用异步函数(如
async_task()),不会执行任务,只会返回协程对象(无任何效果)。
二、aiohttp 客户端最简用法(GET/POST)
原理说明
aiohttp 是基于 asyncio 实现的异步 HTTP 客户端,核心优势是“异步发送 HTTP 请求”,与 FastAPI 配合时,能让整个 Web 应用保持非阻塞,提升并发处理能力(对比 requests:requests 是阻塞式,会卡住 FastAPI 的事件循环)。
aiohttp 客户端的核心是 ClientSession:相当于一个“异步请求会话”,所有 HTTP 请求(GET/POST 等)都通过它发送,会话内可复用连接,提升效率。
1. GET 请求(最简可运行示例)
import asyncio
import aiohttp
# 异步函数:用 aiohttp 发送 GET 请求
async def fetch_get(url: str):
# 1. 创建 ClientSession 实例(会话),推荐用 async with 自动关闭会话(避免资源泄漏)
async with aiohttp.ClientSession() as session:
# 2. 用 session.get() 发送 GET 请求,await 等待响应
# session.get() 是异步方法,必须用 await 等待其完成
async with session.get(url) as response:
# 3. 解析响应:text() 异步获取文本内容,json() 异步获取 JSON 内容(按需选择)
# 注意:response.text()/json() 也是异步方法,必须加 await
response_text = await response.text()
response_json = await response.json() # 若接口返回 JSON 格式,可用这个
print(f"GET 请求状态码:{response.status}")
print(f"GET 请求响应文本(前100字符):{response_text[:100]}")
return response_json # 返回 JSON 格式响应(方便后续使用)
# 启动异步任务
if __name__ == "__main__":
# 调用异步函数,获取协程对象,用 asyncio.run() 运行
result = asyncio.run(fetch_get("https://api.github.com/users/octocat"))
print("GET 请求返回 JSON(简化):", result["login"], result["html_url"])
2. POST 请求(带请求体,最简可运行示例)
import asyncio
import aiohttp
# 异步函数:用 aiohttp 发送 POST 请求(带 JSON 请求体)
async def fetch_post(url: str, data: dict):
# async with 自动管理 ClientSession 生命周期,请求结束后自动关闭
async with aiohttp.ClientSession() as session:
# 发送 POST 请求,json=data 自动将字典转为 JSON 请求体,设置 Content-Type: application/json
async with session.post(url, json=data) as response:
# 解析 JSON 响应
result = await response.json()
print(f"POST 请求状态码:{response.status}")
return result
# 启动异步任务
if __name__ == "__main__":
# 模拟 POST 请求体(比如调用第三方 API 提交数据)
post_data = {"name": "FastAPI", "type": "async", "version": "0.104.1"}
# 调用 POST 异步函数(示例接口为 JSONPlaceholder 测试接口,可直接运行)
result = asyncio.run(fetch_post("https://jsonplaceholder.typicode.com/posts", post_data))
print("POST 请求返回结果:", result)
常见陷阱(重点,贴合 FastAPI 后续使用)
- 陷阱1:忘记用
async with管理ClientSession,或手动创建 session 后未关闭(如session = aiohttp.ClientSession()后不调用await session.close()),会导致连接泄漏、资源浪费,长期运行会让 FastAPI 应用卡死。 - 陷阱2:在异步函数内调用 requests 库(如
requests.get(url)),requests 是阻塞式的,会卡住整个 FastAPI 的事件循环,导致所有异步任务无法执行(这是 FastAPI 中必须用 aiohttp 而非 requests 的核心原因之一)。 - 陷阱3:忘记给 aiohttp 的异步方法加
await(如session.get(url)不加await),会返回协程对象,无法获取响应,还会导致任务无法正常完成。
中级篇:ClientSession 生命周期管理、连接池、超时、重试、错误处理
核心目标:脱离“最简用法”,适配 FastAPI 生产环境需求——避免资源泄漏、提升请求效率、应对网络异常。
一、ClientSession 生命周期管理(重点,FastAPI 必用)
原理说明
ClientSession 是 aiohttp 客户端的核心,其生命周期直接影响资源占用和请求效率:
- 生命周期:创建 → 发送请求 → 关闭,关闭后无法再发送请求。
- 核心原则:避免频繁创建/关闭 ClientSession(每次创建会话都会建立新的连接池,频繁创建会浪费资源);在 FastAPI 中,应全局复用一个 ClientSession(或按需求复用),而非在每个路由中重复创建。
- 管理方式:推荐用
async with自动管理(适合短期任务),或手动创建会话、在程序退出前关闭(适合长期运行的 FastAPI 应用)。
可运行示例(两种管理方式)
import asyncio
import aiohttp
# 方式1:async with 自动管理(适合单次/短期任务,推荐日常调试、简单场景)
async def auto_manage_session():
# 自动创建会话,代码块结束后自动关闭会话(无需手动调用 close())
async with aiohttp.ClientSession() as session:
async with session.get("https://api.github.com/rate_limit") as response:
print("自动管理会话 - 状态码:", response.status)
# 方式2:手动管理会话(适合长期运行的应用,如 FastAPI,需手动关闭)
async def manual_manage_session():
# 手动创建会话
session = aiohttp.ClientSession()
try:
# 发送请求
async with session.get("https://api.github.com/rate_limit") as response:
print("手动管理会话 - 状态码:", response.status)
finally:
# 关键:无论请求成功/失败,都必须关闭会话,释放资源
await session.close()
print("手动管理会话 - 已关闭")
# 启动两个任务,验证两种管理方式
if __name__ == "__main__":
# 用 asyncio.gather 并发运行两个协程(后续进阶篇会详细讲)
asyncio.run(asyncio.gather(auto_manage_session(), manual_manage_session()))
常见陷阱(FastAPI 重点规避)
- 陷阱1:在 FastAPI 路由中,每个请求都创建一个 ClientSession(如在路由函数内写
async with aiohttp.ClientSession() as session),高并发场景下会创建大量会话和连接,导致服务器资源耗尽、响应变慢。 - 陷阱2:手动创建 ClientSession 后,未在 finally 块中关闭(如请求过程中抛出异常,导致
await session.close()未执行),造成连接泄漏。
二、连接池配置(提升请求效率)
原理说明
aiohttp 的 ClientSession 内部默认维护一个“连接池”,用于复用 HTTP 连接(避免每次请求都建立新的 TCP 连接,减少握手开销),适合 FastAPI 高并发场景。
核心配置参数(通过 TCPConnector 设置):
limit:连接池最大连接数(默认 100),超过则排队等待;可根据服务器性能和第三方 API 并发限制调整。limit_per_host:单个主机(如 github.com)的最大连接数(默认 0,即无限制),避免单个主机占用过多连接。
可运行示例(配置连接池)
import asyncio
import aiohttp
async def use_connection_pool():
# 1. 配置连接池
connector = aiohttp.TCPConnector(
limit=5, # 全局连接池最大连接数:5个
limit_per_host=2, # 单个主机最大连接数:2个(比如对 github.com 最多同时发2个请求)
ssl=False # 若不需要 SSL 验证(如测试环境),可设为 False,避免证书问题
)
# 2. 将连接池传入 ClientSession,会话内所有请求复用该连接池
async with aiohttp.ClientSession(connector=connector) as session:
# 模拟并发发送3个请求到同一个主机(github.com)
async def fetch(url):
async with session.get(url) as response:
return response.status
# 并发运行3个请求(单个主机最大连接数2,会有1个请求排队)
urls = ["https://api.github.com/rate_limit"] * 3
results = await asyncio.gather(*[fetch(url) for url in urls])
print("连接池请求结果(状态码):", results) # 输出 [200, 200, 200]
if __name__ == "__main__":
asyncio.run(use_connection_pool())
常见陷阱
- 陷阱:连接池配置过大(如 limit=1000),超过服务器或第三方 API 的承受能力,导致请求被拒绝、服务器卡顿;配置过小,会导致并发请求排队,响应变慢。建议根据 FastAPI 预期 QPS 和第三方 API 限制调整(如第三方 API 每秒允许 10 个请求,limit 可设为 10)。
三、超时配置(避免请求卡死)
原理说明
网络请求可能因网络异常、第三方 API 卡顿而无限等待,导致协程阻塞、资源浪费,甚至影响 FastAPI 整个应用的稳定性。aiohttp 支持设置超时时间,超过时间未响应则抛出异常,终止请求。
核心配置方式:
- 全局超时:给 ClientSession 设置超时,所有通过该会话发送的请求都生效。
- 局部超时:给单个请求(get/post)设置超时,覆盖全局超时(适合部分请求需要更长/更短超时的场景)。
可运行示例(全局+局部超时)
import asyncio
import aiohttp
from aiohttp import ClientTimeout # 导入超时类
async def use_timeout():
# 1. 配置全局超时(所有请求默认生效):总超时 5 秒
# ClientTimeout(total=5) 表示从请求发送到获取完整响应的总时间不超过 5 秒
timeout = ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
# 示例1:正常请求(响应快,不会超时)
async with session.get("https://api.github.com/rate_limit") as response:
print("正常请求 - 状态码:", response.status)
# 示例2:局部超时(覆盖全局,设置 1 秒超时,故意请求一个慢接口)
try:
# 用 timeout 参数覆盖全局超时,只给这个请求设 1 秒超时
async with session.get("https://httpbin.org/delay/2", timeout=ClientTimeout(total=1)) as response:
print("慢请求 - 状态码:", response.status)
except asyncio.TimeoutError:
# 捕获超时异常,进行异常处理(如重试、返回错误信息)
print("慢请求 - 超时了!(预期异常)")
if __name__ == "__main__":
asyncio.run(use_timeout())
常见陷阱
- 陷阱1:未设置超时,第三方 API 卡顿或网络中断时,请求会无限等待,导致协程阻塞,长期积累会让 FastAPI 事件循环挂满阻塞协程,应用无响应。
- 陷阱2:超时设置过短(如 0.5 秒),正常请求也会频繁超时;设置过长(如 60 秒),无法及时释放阻塞协程,失去超时的意义。建议根据第三方 API 的平均响应时间调整(如 API 平均响应 1 秒,可设 total=3 秒)。
四、重试机制(应对临时网络异常)
原理说明
网络请求可能因临时网络波动(如断网重连、第三方 API 临时不可用)而失败,重试机制可自动重新发送请求,提升请求成功率(适合 FastAPI 中依赖第三方 API 的场景)。
aiohttp 本身不内置重试功能,需结合 tenacity 库(Python 主流重试库,支持异步)实现,核心是:捕获指定异常(如超时、连接错误),按配置的次数和间隔重试。
可运行示例(结合 tenacity 实现异步重试)
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
# 先安装 tenacity:pip install tenacity
# 定义重试规则(装饰器形式,作用于异步函数)
@retry(
stop=stop_after_attempt(3), # 最多重试 3 次(加上第一次请求,共 4 次)
wait=wait_exponential(multiplier=1, min=2, max=5), # 重试间隔:2^n 秒(n 是重试次数),最小 2 秒,最大 5 秒
retry=retry_if_exception_type(( # 只有捕获到以下异常才重试
asyncio.TimeoutError, # 超时异常
aiohttp.ClientConnectionError, # 连接错误(如网络中断)
aiohttp.ClientResponseError # 响应错误(如 500 服务器错误)
)),
reraise=True # 重试次数用完仍失败,重新抛出异常,供上层处理
)
async def fetch_with_retry(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
# 若响应状态码 >= 400,抛出 ClientResponseError 异常(触发重试)
response.raise_for_status()
return await response.json()
async def main():
async with aiohttp.ClientSession(timeout=ClientTimeout(total=3)) as session:
try:
# 故意请求一个可能失败的接口(模拟网络波动)
result = await fetch_with_retry(session, "https://httpbin.org/status/500")
print("请求成功:", result)
except Exception as e:
# 重试 3 次后仍失败,捕获异常并处理
print(f"请求失败(重试 3 次后):{type(e).__name__}: {e}")
if __name__ == "__main__":
asyncio.run(main())
常见陷阱
- 陷阱1:重试无限制(如未设置 stop_after_attempt),第三方 API 长期不可用时,会无限重试,导致大量无效请求,浪费资源,甚至被第三方 API 封禁 IP。
- 陷阱2:重试间隔过短(如 0.1 秒),频繁重试会给第三方 API 造成压力,可能触发反爬或限流;间隔过长,会导致 FastAPI 请求响应变慢。
- 陷阱3:对所有异常都重试(如 404 资源不存在),404 是业务异常,重试无意义,还会浪费资源,需精准指定重试异常类型。
五、错误处理(全面应对异常场景)
原理说明
aiohttp 客户端请求过程中可能出现多种异常(连接错误、超时、响应错误、JSON 解析错误等),在 FastAPI 中,若未处理异常,会导致接口返回 500 错误,影响用户体验。需全面捕获异常,并返回友好的错误信息。
核心异常类型(常用):
aiohttp.ClientConnectionError:连接错误(如网络中断、域名不存在)。asyncio.TimeoutError:请求超时。aiohttp.ClientResponseError:响应错误(如 400 Bad Request、500 Internal Server Error)。ValueError:JSON 解析错误(如接口返回非 JSON 格式,调用await response.json()会抛出)。
可运行示例(全面错误处理)
import asyncio
import aiohttp
from aiohttp import ClientTimeout
async def fetch_with_error_handling(url: str):
async with aiohttp.ClientSession(timeout=ClientTimeout(total=3)) as session:
try:
# 发送请求
async with session.get(url) as response:
# 若状态码 >=400,抛出 ClientResponseError
response.raise_for_status()
try:
# 尝试解析 JSON 响应
return {"status": "success", "data": await response.json()}
except ValueError:
# 解析 JSON 失败(如接口返回文本)
return {"status": "fail", "error": "响应格式错误,非 JSON", "response_text": await response.text()}
except aiohttp.ClientConnectionError as e:
# 连接错误(如网络断了、域名错误)
return {"status": "fail", "error": f"连接失败:{str(e)}"}
except asyncio.TimeoutError:
# 请求超时
return {"status": "fail", "error": "请求超时(超过 3 秒)"}
except aiohttp.ClientResponseError as e:
# 响应错误(如 404、500)
return {"status": "fail", "error": f"响应错误:状态码 {e.status},原因 {e.message}"}
except Exception as e:
# 捕获其他未预料到的异常(兜底处理)
return {"status": "fail", "error": f"未知错误:{type(e).__name__}: {str(e)}"}
async def main():
# 测试 4 种异常场景 + 1 种正常场景
urls = [
"https://api.github.com/users/octocat", # 正常(JSON 响应)
"https://httpbin.org/status/404", # 404 响应错误
"https://httpbin.org/delay/4", # 超时
"https://invalid-domain-12345.com", # 连接错误(域名不存在)
"https://httpbin.org/text" # 响应非 JSON(文本)
]
for url in urls:
print(f"\n请求 URL:{url}")
result = await fetch_with_error_handling(url)
print("请求结果:", result)
if __name__ == "__main__":
asyncio.run(main())
常见陷阱
- 陷阱1:只捕获部分异常(如只捕获超时,不捕获连接错误),未预料到的异常会导致 FastAPI 接口返回 500 错误,不利于问题排查。
- 陷阱2:异常处理过于简单(如只打印错误信息,不返回具体错误原因),FastAPI 接口调用者无法得知失败原因(如“连接失败”还是“超时”),不利于调试。
- 陷阱3:使用
response.raise_for_status()后,未捕获ClientResponseError,导致 400/500 响应直接抛出异常,未做友好处理。
进阶篇:并发控制、流式响应、代理与认证
核心目标:应对 FastAPI 高并发场景、特殊响应需求(如大文件下载)、第三方 API 访问限制(如需代理、认证)。
一、并发控制(asyncio.gather / Semaphore)
原理说明
FastAPI 是异步框架,支持高并发请求,当路由中需要同时调用多个第三方 API 时(如一次请求需要获取 A、B、C 三个 API 的数据),需用并发控制提升效率,同时避免并发过高导致第三方 API 限流或自身服务器压力过大。
核心工具:
asyncio.gather:并发运行多个协程,等待所有协程完成,返回所有结果(按协程顺序排列),适合“多个独立请求,需要全部完成后再处理”的场景。asyncio.Semaphore:信号量,限制同时运行的协程数量(如限制最多同时发送 5 个请求),适合“控制并发量,避免触发第三方 API 限流”的场景。
可运行示例1:asyncio.gather 并发请求(无限制)
import asyncio
import aiohttp
# 异步请求函数(复用之前的错误处理逻辑简化版)
async def fetch(session: aiohttp.ClientSession, url: str):
try:
async with session.get(url) as response:
response.raise_for_status()
return {"url": url, "status": "success", "data": await response.json()}
except Exception as e:
return {"url": url, "status": "fail", "error": str(e)}
async def main():
# 复用一个 ClientSession 和连接池,提升并发效率
async with aiohttp.ClientSession(timeout=ClientTimeout(total=3)) as session:
# 准备 5 个需要并发请求的 URL(都是测试接口,可直接运行)
urls = [
"https://api.github.com/users/octocat",
"https://jsonplaceholder.typicode.com/posts/1",
"https://httpbin.org/get",
"https://api.github.com/rate_limit",
"https://jsonplaceholder.typicode.com/comments/1"
]
# 1. 创建协程列表(每个 URL 对应一个协程)
tasks = [fetch(session, url) for url in urls]
# 2. 用 asyncio.gather 并发运行所有协程,等待全部完成
results = await asyncio.gather(*tasks) # * 解包协程列表
# 打印结果
for result in results:
print(result)
if __name__ == "__main__":
asyncio.run(main())
可运行示例2:Semaphore 限制并发量(重点,FastAPI 高并发必用)
import asyncio
import aiohttp
# 异步请求函数(加入信号量控制)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore):
# 用 semaphore 限制并发:每次只有 semaphore 允许的数量能进入代码块
async with semaphore:
print(f"开始请求:{url}")
try:
async with session.get(url) as response:
await asyncio.sleep(1) # 模拟请求耗时(方便观察并发控制效果)
response.raise_for_status()
result = {"url": url, "status": "success", "status_code": response.status}
except Exception as e:
result = {"url": url, "status": "fail", "error": str(e)}
print(f"结束请求:{url}")
return result
async def main():
# 1. 配置信号量:限制最多同时运行 2 个协程(即同时发送 2 个请求)
semaphore = asyncio.Semaphore(2)
# 2. 复用会话和连接池
async with aiohttp.ClientSession(timeout=ClientTimeout(total=5)) as session:
# 准备 5 个 URL(测试并发限制效果)
urls = [
"https://httpbin.org/get?num=1",
"https://httpbin.org/get?num=2",
"https://httpbin.org/get?num=3",
"https://httpbin.org/get?num=4",
"https://httpbin.org/get?num=5"
]
# 3. 创建协程列表,每个协程传入信号量
tasks = [fetch(session, url, semaphore) for url in urls]
# 4. 并发运行,限制并发量为 2
results = await asyncio.gather(*tasks)
print("\n最终结果:")
for res in results:
print(res)
if __name__ == "__main__":
asyncio.run(main())
运行效果说明
上述示例中,信号量限制并发量为 2,会看到“同时开始 2 个请求 → 2 个请求结束后,再开始下 2 个 → 最后 1 个请求”的效果,避免 5 个请求同时发送,触发第三方 API 限流。
常见陷阱(FastAPI 重点)
- 陷阱1:无限制并发(不用 Semaphore),高并发场景下(如 FastAPI 同时接收 100 个请求,每个请求都并发调用 5 个第三方 API),会导致瞬间发送 500 个请求,触发第三方 API 限流,或导致自身服务器端口耗尽。
- 陷阱2:信号量配置过小(如 1),失去并发优势,和同步请求效率一致;配置过大(如 100),达不到限制并发的目的。建议根据第三方 API 限流规则配置(如第三方 API 每秒允许 10 个请求,信号量可设为 10)。
- 陷阱3:每个协程都创建一个 ClientSession,即使使用 Semaphore,也会浪费连接资源(连接池无法复用),必须复用同一个 ClientSession。
二、流式响应(处理大文件/大响应)
原理说明
当第三方 API 返回大文件(如图片、视频)或超大 JSON 数据时,若用 await response.json() 或 await response.text(),会将整个响应内容加载到内存中,导致内存占用过高(甚至 OOM 崩溃),适合 FastAPI 中下载文件、处理超大响应的场景。
aiohttp 流式响应核心:通过 response.content(异步迭代器)逐块读取响应内容,按需处理(如逐块保存到文件、逐块返回给 FastAPI 前端),无需将整个内容加载到内存。
可运行示例(流式下载大文件)
import asyncio
import aiohttp
# 异步流式下载大文件(如图片、视频)
async def stream_download(session: aiohttp.ClientSession, url: str, save_path: str):
async with session.get(url) as response:
response.raise_for_status()
# 获取文件总大小(从响应头获取,单位:字节)
total_size = int(response.headers.get("Content-Length", 0))
downloaded_size = 0
# 以二进制写入模式打开文件,逐块写入(流式保存)
with open(save_path, "wb") as f:
# 逐块读取响应内容,chunk_size 为每次读取的块大小(如 1024 字节 = 1KB)
async for chunk in response.content.iter_chunked(chunk_size=1024):
if chunk: # 确保块不为空
f.write(chunk)
downloaded_size += len(chunk)
# 打印下载进度(可选)
progress = (downloaded_size / total_size) * 100 if total_size > 0 else 0
print(f"\r下载进度:{progress:.2f}%({downloaded_size}/{total_size} 字节)", end="")
print(f"\n文件下载完成,保存路径:{save_path}")
async def main():
# 示例:下载一张大图片(可替换为任意大文件 URL)
file_url = "https://picsum.photos/2000/2000" # 2000x2000 的图片,约 1-2MB
save_path = "large_image.jpg"
async with aiohttp.ClientSession(timeout=ClientTimeout(total=30)) as session:
await stream_download(session, file_url, save_path)
if __name__ == "__main__":
asyncio.run(main())
可运行示例(FastAPI 中流式返回大文件)
from fastapi import FastAPI, Response, status
from fastapi.responses import StreamingResponse
import aiohttp
from contextlib import asynccontextmanager
# FastAPI 应用,用 lifespan 管理全局 ClientSession(后续集成篇会详细讲)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时创建全局会话
app.state.session = aiohttp.ClientSession(timeout=ClientTimeout(total=30))
yield
# 关闭时关闭会话
await app.state.session.close()
app = FastAPI(lifespan=lifespan)
# FastAPI 路由:流式返回第三方 API 的大文件(避免加载到内存)
@app.get("/stream-file", summary="流式返回大文件")
async def stream_file(response: Response):
file_url = "https://picsum.photos/2000/2000" # 第三方大文件 URL
session = app.state.session # 复用全局会话
try:
async with session.get(file_url) as resp:
resp.raise_for_status()
# 获取文件类型(从响应头获取,用于设置返回头)
content_type = resp.headers.get("Content-Type", "application/octet-stream")
# 流式返回:将 aiohttp 的响应流,直接返回给 FastAPI 前端
return StreamingResponse(
resp.content.iter_chunked(1024), # 逐块读取
media_type=content_type,
status_code=status.HTTP_200_OK
)
except Exception as e:
return {"status": "fail", "error": f"流式返回失败:{str(e)}"}
# 运行方式:uvicorn main:app --reload(main 是当前文件名)
# 访问:http://127.0.0.1:8000/stream-file,会直接下载/预览大图片
常见陷阱
- 陷阱1:处理大响应时,仍用
await response.text()或await response.json(),将整个内容加载到内存,导致内存占用过高,FastAPI 应用崩溃。 - 陷阱2:流式读取时,未设置足够的超时时间(如大文件下载需要 30 秒,超时设为 5 秒),会导致下载中断、超时异常。
- 陷阱3:FastAPI 中返回流式响应时,未复用全局 ClientSession,在路由中重复创建会话,导致资源泄漏。
三、代理与认证(访问受限第三方 API)
原理说明
在 FastAPI 中调用第三方 API 时,可能遇到两种限制:
- IP 限制:第三方 API 只允许特定 IP 访问,需通过代理服务器转发请求(隐藏真实 IP)。
- 认证限制:第三方 API 需身份认证(如 Basic 认证、Bearer Token 认证),需在请求头中携带认证信息。
aiohttp 支持配置 HTTP/HTTPS 代理、多种认证方式,适配上述场景。
可运行示例1:配置代理(HTTP/HTTPS 代理)
import asyncio
import aiohttp
async def fetch_with_proxy():
# 配置代理(替换为你的代理地址和端口,若有账号密码,格式:http://user:pass@proxy_ip:proxy_port)
proxy = "http://127.0.0.1:7890" # 示例代理(如 Clash 本地代理)
# 方式1:全局代理(所有请求都通过该代理)
connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
try:
# 发送请求时,通过 proxy 参数设置代理
async with session.get("https://httpbin.org/ip", proxy=proxy) as response:
# httpbin.org/ip 会返回当前请求的 IP,可验证代理是否生效
result = await response.json()
print("代理请求成功,当前 IP:", result["origin"])
except Exception as e:
print("代理请求失败:", str(e))
if __name__ == "__main__":
asyncio.run(fetch_with_proxy())
可运行示例2:认证(Basic 认证 + Bearer Token 认证)
import asyncio
import aiohttp
from aiohttp import BasicAuth
async def fetch_with_auth():
async with aiohttp.ClientSession() as session:
# 方式1:Basic 认证(账号密码认证,如部分旧版 API)
print("=== Basic 认证 ===")
# BasicAuth 会自动将账号密码编码为 Base64,添加到请求头 Authorization: Basic xxxx
auth = BasicAuth(login="test_user", password="test_pass")
try:
# 发送请求时,通过 auth 参数设置 Basic 认证
async with session.get("https://httpbin.org/basic-auth/test_user/test_pass", auth=auth) as response:
print("Basic 认证结果:", await response.json())
except Exception as e:
print("Basic 认证失败:", str(e))
# 方式2:Bearer Token 认证(常用,如 OAuth2 认证,GitHub API 等)
print("\n=== Bearer Token 认证 ===")
token = "your_bearer_token" # 替换为你的真实 Token(如 GitHub Personal Access Token)
# 手动设置请求头:Authorization: Bearer {token}
headers = {"Authorization": f"Bearer {token}"}
try:
# 发送请求时,通过 headers 参数设置认证头(示例:GitHub API 查看自身信息)
async with session.get("https://api.github.com/user", headers=headers) as response:
if response.status == 401:
print("Bearer Token 无效或过期")
else:
print("Bearer Token 认证结果(简化):", (await response.json())["login"])
except Exception as e:
print("Bearer Token 认证失败:", str(e))
if __name__ == "__main__":
asyncio.run(fetch_with_auth())
常见陷阱(FastAPI 重点)
- 陷阱1:代理配置错误(如代理地址错误、端口错误),导致请求连接失败,未做代理异常处理,FastAPI 接口返回 500 错误。
- 陷阱2:认证信息泄露(如将 Bearer Token、Basic 认证账号密码硬编码到代码中),存在安全风险,建议通过环境变量(如 python-dotenv)管理认证信息。
- 陷阱3:Bearer Token 过期未处理,导致请求频繁失败,未添加 Token 过期检测和重试逻辑。
FastAPI 集成篇:安全、高效使用 aiohttp(核心重点)
核心目标:解决“FastAPI 中如何正确使用 aiohttp”的核心问题,规避生产环境中的资源泄漏、性能瓶颈,实现最佳协作。
一、核心对比:为什么 FastAPI 中不用 requests 而用 aiohttp?
核心原因(异步 vs 阻塞,决定 FastAPI 性能)
FastAPI 是基于 asyncio 的异步 Web 框架,其事件循环(event loop)是“单线程+非阻塞”的,所有请求都在同一个事件循环中处理,关键差异如下:
| 特性 | requests(阻塞式) | aiohttp(异步式) | 对 FastAPI 的影响 |
|---|---|---|---|
| 执行方式 | 阻塞式,请求期间 CPU 空闲但无法做其他事 | 非阻塞式,请求等待期间 CPU 可处理其他请求 | requests 会卡住事件循环,aiohttp 不影响事件循环 |
| 并发能力 | 低,需通过多线程/多进程弥补,资源消耗高 | 高,依托 asyncio 事件循环,资源消耗低 | aiohttp 能充分发挥 FastAPI 的高并发优势 |
| 协作性 | 不支持 async/await,无法与 FastAPI 异步路由无缝协作 | 原生支持 async/await,与 FastAPI 异步路由完美契合 | aiohttp 可直接在 FastAPI 异步路由中使用,代码简洁 |
直观示例(对比 requests 和 aiohttp 在 FastAPI 中的差异)
from fastapi import FastAPI
import requests
import aiohttp
import asyncio
app = FastAPI()
# 示例1:FastAPI 路由中使用 requests(阻塞式,不推荐)
@app.get("/requests-test", summary="使用 requests(阻塞)")
def requests_test():
# requests.get() 是阻塞式,会卡住 FastAPI 的事件循环
# 此时,其他所有请求都无法被处理,直到这个请求完成
response = requests.get("https://api.github.com/users/octocat")
return response.json()
# 示例2:FastAPI 路由中使用 aiohttp(异步式,推荐)
@app.get("/aiohttp-test", summary="使用 aiohttp(异步)")
async def aiohttp_test():
# aiohttp 是异步式,请求等待期间,事件循环可处理其他请求
async with aiohttp.ClientSession() as session:
async with session.get("https://api.github.com/users/octocat") as response:
return await response.json()
# 测试效果:
# 1. 启动应用:uvicorn main:app --reload
# 2. 同时访问两个路由:/requests-test 和 /aiohttp-test
# 3. 现象:访问 /requests-test 时,再访问 /aiohttp-test 会卡住,直到前者完成;
# 访问 /aiohttp-test 时,再访问其他路由可正常响应。
结论
在 FastAPI 中,只要涉及 HTTP 请求调用(第三方 API),必须用 aiohttp,绝对不能用 requests——requests 会彻底扼杀 FastAPI 的高并发优势,导致应用性能急剧下降。
二、最佳实践:FastAPI 中管理 aiohttp ClientSession(核心,规避资源泄漏)
核心问题
FastAPI 应用是长期运行的,若在每个路由中重复创建 ClientSession(如async with aiohttp.ClientSession() as session),会导致:
- 资源泄漏:每次创建会话都会建立新的连接池,频繁创建/关闭会浪费 TCP 连接、端口资源。
- 性能下降:连接池无法复用,每次请求都需要重新建立 TCP 连接,增加握手开销。
两种最佳管理方式(推荐第二种,更灵活)
方式1:通过 lifespan 管理全局 ClientSession(FastAPI 1.0+ 推荐)
原理说明
FastAPI 的 lifespan 上下文管理器,用于管理应用的生命周期(启动 → 运行 → 关闭),适合管理全局唯一的资源(如 ClientSession、数据库连接):
- 启动时(yield 之前):创建 ClientSession,存储到 app.state 中(app 的全局状态)。
- 运行时:所有路由通过
app.state.session复用同一个 ClientSession。 - 关闭时(yield 之后):关闭 ClientSession,释放资源。
可运行示例
from fastapi import FastAPI
import aiohttp
from aiohttp import ClientTimeout
from contextlib import asynccontextmanager
# 1. 定义 lifespan 上下文管理器,管理全局 ClientSession
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时:创建全局 ClientSession(配置超时、连接池)
app.state.session = aiohttp.ClientSession(
timeout=ClientTimeout(total=5), # 全局超时 5 秒
connector=aiohttp.TCPConnector(limit=10) # 连接池最大 10 个连接
)
yield # 应用运行期间,会停在这里,直到应用关闭
# 关闭时:关闭全局 ClientSession,释放资源
await app.state.session.close()
print("全局 ClientSession 已关闭")
# 2. 创建 FastAPI 应用,传入 lifespan
app = FastAPI(lifespan=lifespan, title="FastAPI + aiohttp 全局会话管理")
# 3. 路由中复用全局 ClientSession(所有路由共用一个会话)
@app.get("/github-user/{username}", summary="获取GitHub用户信息", response_description="GitHub用户基础信息")
async def get_github_user(username: str):
# 从 app.state 中获取全局会话,无需重复创建
session = app.state.session
url = f"https://api.github.com/users/{username}"
try:
# 复用全局会话发送请求,无需手动关闭
async with session.get(url) as response:
response.raise_for_status()
user_info = await response.json()
# 简化返回结果,只返回关键信息
return {
"status": "success",
"data": {
"username": user_info.get("login"),
"avatar_url": user_info.get("avatar_url"),
"html_url": user_info.get("html_url"),
"bio": user_info.get("bio", "无简介"),
"public_repos": user_info.get("public_repos", 0)
}
}
except aiohttp.ClientConnectionError:
return {"status": "fail", "error": "连接失败,请检查网络或GitHub API可用性"}
except asyncio.TimeoutError:
return {"status": "fail", "error": "请求超时,建议稍后重试"}
except aiohttp.ClientResponseError as e:
if e.status == 404:
return {"status": "fail", "error": f"用户 {username} 不存在"}
return {"status": "fail", "error": f"响应错误:状态码 {e.status}"}
except Exception as e:
return {"status": "fail", "error": f"未知错误:{str(e)}"}
# 4. 新增多个路由示例,均复用全局 ClientSession
@app.get("/github-rate-limit", summary="获取GitHub API速率限制", response_description="API速率限制详情")
async def get_github_rate_limit():
session = app.state.session
url = "https://api.github.com/rate_limit"
try:
async with session.get(url) as response:
response.raise_for_status()
rate_limit = await response.json()
return {
"status": "success",
"data": {
"core": rate_limit.get("resources", {}).get("core", {}),
"search": rate_limit.get("resources", {}).get("search", {})
}
}
except Exception as e:
return {"status": "fail", "error": f"获取速率限制失败:{str(e)}"}
# 运行方式:uvicorn main:app --reload(main 是当前文件名)
# 测试访问:http://127.0.0.1:8000/github-user/octocat
# 核心优势:所有路由共用一个全局 ClientSession,连接池复用,无资源泄漏风险
# 方式2:通过依赖项管理 ClientSession(更灵活,支持局部配置)
# 原理说明:FastAPI 的依赖项可实现“按需注入”,适合不同路由需要不同会话配置的场景
# 同时仍能保证会话复用,避免重复创建
from fastapi import Depends
# 定义依赖项:创建并复用 ClientSession(单例模式,整个应用生命周期内只创建一次)
async def get_session():
# 局部配置(可根据需求调整,与全局配置区分)
session = aiohttp.ClientSession(
timeout=ClientTimeout(total=6),
connector=aiohttp.TCPConnector(limit=8)
)
try:
yield session # 路由函数使用完会话后,执行后续关闭操作
finally:
await session.close()
# 路由中通过 Depends 注入会话,无需手动管理生命周期
@app.get("/jsonplaceholder/post/{post_id}", summary="获取JSONPlaceholder文章", dependencies=[Depends(get_session)])
async def get_post(post_id: int, session: aiohttp.ClientSession = Depends(get_session)):
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
try:
async with session.get(url) as response:
response.raise_for_status()
post = await response.json()
return {"status": "success", "data": post}
except Exception as e:
return {"status": "fail", "error": f"获取文章失败:{str(e)}"}
# 两种方式对比总结
# 方式1(lifespan):适合全局统一配置,所有路由复用同一个会话,配置简单,推荐大多数场景
# 方式2(依赖项):适合局部配置差异化,不同路由可使用不同会话参数,灵活性更高,适合复杂场景
h2 id="247">三、FastAPI + aiohttp 生产环境避坑清单(必看)### 核心避坑点(生产环境高频问题)1. 坚决不用 requests:任何情况下,FastAPI 异步路由中都不能使用 requests 库,必须用 aiohttp,避免卡住事件循环。2. 会话必须复用:禁止在路由函数内重复创建 ClientSession,优先用 lifespan 管理全局会话,复杂场景用依赖项注入。3. 必设超时和错误处理:所有 aiohttp 请求必须配置超时,且全面捕获异常,避免协程阻塞或返回 500 错误。4. 并发需控制:高并发场景下,必须用 Semaphore 限制请求并发量,结合连接池配置,避免触发第三方 API 限流。5. 敏感信息不硬编码:代理账号密码、Bearer Token 等敏感信息,用环境变量(python-dotenv)管理,禁止硬编码到代码中。6. 流式处理大响应:返回大文件、超大 JSON 时,必须用流式响应(response.content.iter_chunked),避免内存溢出。### 生产环境可直接复用的封装示例from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
import aiohttp
from aiohttp import ClientTimeout, ClientResponseError
import asyncio
from contextlib import asynccontextmanager
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import os
from dotenv import load_dotenv # 需安装:pip install python-dotenv
# 加载环境变量(管理敏感信息)
load_dotenv()
# 1. 全局会话管理(lifespan)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 从环境变量获取配置,避免硬编码
timeout_total = int(os.getenv("AIOHTTP_TIMEOUT", 5))
connector_limit = int(os.getenv("AIOHTTP_CONNECTOR_LIMIT", 10))
app.state.session = aiohttp.ClientSession(
timeout=ClientTimeout(total=timeout_total),
connector=aiohttp.TCPConnector(limit=connector_limit, ssl=True)
)
yield
await app.state.session.close()
app = FastAPI(lifespan=lifespan, title="FastAPI + aiohttp 生产环境封装")
# 2. 通用重试装饰器(可复用)
def async_retry_decorator():
return retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=5),
retry=retry_if_exception_type((
asyncio.TimeoutError,
aiohttp.ClientConnectionError,
ClientResponseError # 仅重试 5xx 错误,4xx 业务错误不重试
)),
reraise=True
)
# 3. 通用请求封装(GET/POST 通用,集成重试、错误处理)
@async_retry_decorator()
async def async_request(
session: aiohttp.ClientSession,
url: str,
method: str = "GET",
params: dict = None,
json: dict = None,
headers: dict = None
):
method = method.upper()
if method not in ["GET", "POST"]:
raise ValueError("仅支持 GET、POST 方法")
headers = headers or {}
# 从环境变量获取认证信息,注入请求头(示例:Bearer Token)
bearer_token = os.getenv("BEARER_TOKEN")
if bearer_token:
headers["Authorization"] = f"Bearer {bearer_token}"
async with getattr(session, method)(
url, params=params, json=json, headers=headers
) as response:
# 仅重试 5xx 错误,4xx 错误直接抛出,不重试
if response.status >= 500:
response.raise_for_status()
# 解析响应(自动适配 JSON/文本)
try:
return await response.json()
except ValueError:
return await response.text()
# 4. 路由示例(复用封装好的请求方法)
@app.get("/api/get", summary="通用GET请求示例")
async def api_get(url: str, params: dict = None):
session = app.state.session
try:
result = await async_request(session, url, method="GET", params=params)
return {"status": "success", "data": result}
except ClientResponseError as e:
raise HTTPException(
status_code=e.status,
detail=f"请求失败:{e.message}"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"服务器错误:{str(e)}"
)
@app.post("/api/post", summary="通用POST请求示例")
async def api_post(url: str, json: dict = None):
session = app.state.session
try:
result = await async_request(session, url, method="POST", json=json)
return {"status": "success", "data": result}
except ClientResponseError as e:
raise HTTPException(
status_code=e.status,
detail=f"请求失败:{e.message}"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"服务器错误:{str(e)}"
)
# 5. 流式下载示例(生产环境版)
@app.get("/download/file", summary="流式下载文件")
async def download_file(file_url: str):
session = app.state.session
try:
async with session.get(file_url, timeout=ClientTimeout(total=60)) as resp:
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "application/octet-stream")
# 从响应头获取文件名(可选)
disposition = resp.headers.get("Content-Disposition", "")
filename = "downloaded_file"
if "filename=" in disposition:
filename = disposition.split("filename=")[-1].strip('"')
return StreamingResponse(
resp.content.iter_chunked(4096), # 4KB 为一块,优化内存占用
media_type=content_type,
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"文件下载失败:{str(e)}"
)
h1 id="259">总结篇:aiohttp + FastAPI 学习路径与实战建议### 学习路径(从基础到生产)基础阶段:掌握 async/await 语法,理解异步非阻塞原理,能写出 aiohttp 最简 GET/POST 请求。中级阶段:重点掌握 ClientSession 生命周期、连接池、超时、重试、错误处理,规避基础陷阱。进阶阶段:学习并发控制(Semaphore)、流式响应、代理与认证,适配复杂场景。集成阶段:掌握 FastAPI 中会话管理的两种方式,封装通用请求方法,适配生产环境。### 实战建议1. 多练示例代码:本文所有示例均可直接运行,建议逐段调试,观察异步执行效果,理解每一步配置的意义。2. 优先复用封装:生产环境中,不要重复编写请求逻辑,封装通用请求方法,统一管理超时、重试、错误处理。3. 关注资源占用:定期排查会话是否正常关闭、连接池配置是否合理,避免资源泄漏导致应用卡顿。4. 结合官方文档:aiohttp 和 FastAPI 官方文档更新及时,遇到问题可优先查阅官方文档(附地址):
- aiohttp 官方文档:https:/docs.aiohttp.org/
- FastAPI 官方文档:https:/fastapi.tiangolo.com/
### 核心总结aiohttp 是 FastAPI 异步开发中调用第三方 API 的最佳选择,其核心价值是“保持异步非阻塞,充分发挥 FastAPI 高并发优势”。学习的关键不在于记住所有 API,而在于理解异步原理、会话管理、异常处理的核心逻辑,规避资源泄漏、协程阻塞等高频陷阱,最终实现安全、高效、可维护的生产环境代码。(注:文档部分内容可能由 AI 生成)