Python 框架 - APScheduler (定时任务调度框架 )

APScheduler 从零到实战(FastAPI 定时任务全指南)

本文面向已掌握 Python 异步基础和 FastAPI 路由用法、无 APScheduler 经验的自学开发者,从零开始、由浅入深讲解 APScheduler 的使用,重点聚焦 FastAPI 异步环境下的定时任务实现,覆盖基础、中级、进阶及集成全流程,每个知识点均包含「原理说明+最小可运行代码+常见陷阱」,解决实际开发中后台定时任务(日志清理、数据同步、自定义提醒等)的核心需求。

前言:为什么选择 APScheduler?

在 FastAPI 中实现定时任务,常见方案有 APScheduler、Celery、BackgroundTasks 等,其中 APScheduler 具备以下优势,适配大多数中小规模定时任务场景:

  • 轻量无依赖(核心功能无需额外组件),部署简单;
  • 支持同步/异步任务,完美适配 FastAPI 异步环境;
  • 灵活的触发器(interval/cron/date),满足各类定时需求;
  • 支持任务持久化、动态管理,可与数据库无缝集成;
  • 无需像 Celery 那样依赖 Redis/RabbitMQ 作为消息队列,降低架构复杂度。

本文将围绕 APScheduler 展开,全程结合 FastAPI 实战,解决「怎么用」「怎么用好」「怎么避坑」三个核心问题。

基础篇:APScheduler 核心入门

基础篇目标:掌握 APScheduler 三大核心组件,实现简单的内存级定时任务,理解不同触发器的用法。

1.1 核心概念(Job、Trigger、Scheduler)

原理说明

APScheduler 的核心由 3 个组件构成,三者协同工作完成定时任务的调度与执行:

  1. Job(任务):最基本的执行单元,封装了要执行的函数(同步/异步)、参数、执行策略(如重试次数、执行超时时间)。
  2. Trigger(触发器):定义 Job 的执行时机,决定「什么时候执行任务」,APScheduler 提供 3 种内置触发器(覆盖所有常见场景)。
  3. Scheduler(调度器):核心控制器,负责将 Job 与 Trigger 绑定,管理 Job 的生命周期(添加、删除、暂停、执行),同时负责任务的存储、线程/进程管理。

补充:基础篇使用「内存存储」(默认),即任务信息仅保存在内存中,服务重启后任务丢失,适合临时测试。

最小可运行代码(核心组件演示)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 基础篇:核心组件(Job + Trigger + Scheduler)演示
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger

# 1. 定义要执行的任务函数(同步函数,基础篇默认用同步)
def print_hello(name: str = "默认用户"):
    """简单的任务函数,打印问候语"""
    print(f"Hello, {name}! 这是定时任务的输出")

# 2. 初始化调度器(基础篇用 BlockingScheduler,阻塞式调度器,适合单独运行的定时任务脚本)
# BlockingScheduler 会阻塞当前线程,直到手动停止(Ctrl+C)
scheduler = BlockingScheduler()

# 3. 创建 Trigger(触发器):每 5 秒执行一次
trigger = IntervalTrigger(seconds=5)

# 4. 创建 Job(任务),并绑定到调度器
# add_job 方法:将任务、触发器绑定到调度器,返回 Job 对象(可用于后续管理)
job = scheduler.add_job(
    func=print_hello,          # 要执行的任务函数
    trigger=trigger,           # 触发器(执行时机)
    args=["APScheduler 新手"], # 任务函数的位置参数(可选)
    kwargs={"name": "APScheduler 新手"}, # 任务函数的关键字参数(可选)
    id="hello_job",            # 任务唯一ID(必填,用于后续动态管理)
    replace_existing=True      # 如果存在相同id的任务,替换它(避免重复添加)
)

# 5. 启动调度器(阻塞当前线程,开始执行任务)
if __name__ == "__main__":
    try:
        print("调度器已启动,按 Ctrl+C 停止...")
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        # 捕获中断信号,优雅停止调度器
        print("调度器正在停止...")
        scheduler.shutdown()
        print("调度器已停止")
    

常见陷阱

  • 陷阱1:忘记给 Job 设唯一 id,或重复添加相同 id 的任务 → 报错 Job identifier (xxx) already exists,解决方案:添加 id="唯一标识"replace_existing=True
  • 陷阱2:调度器启动后,代码无法继续执行 → BlockingScheduler 是阻塞式的,会占用当前线程,适合单独作为定时任务脚本运行(后续 FastAPI 中会用非阻塞的 AsyncIOScheduler)。
  • 陷阱3:任务函数有参数,但未通过 args/kwargs 传递 → 任务执行时会报错「缺少位置参数」,解决方案:通过 add_job 的 args(元组)、kwargs(字典)传递参数。

1.2 三种内置触发器(interval/cron/date)

触发器是定时任务的「时间规则」,APScheduler 内置 3 种触发器,覆盖所有常见定时场景,按需选择即可。

1.2.1 IntervalTrigger(固定时间间隔触发器)

原理说明

按「固定时间间隔」重复执行任务,比如每 5 秒、每 1 小时、每 2 天,适合「周期性重复、间隔固定」的场景(如每小时同步第三方数据)。

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler

# 任务函数
def sync_third_party_data():
    """模拟:每小时同步第三方数据"""
    print("正在同步第三方数据...")
    # 实际场景:请求第三方API、解析数据、存入数据库

scheduler = BlockingScheduler()

# 添加任务:每 1 小时执行一次(interval 触发器核心参数:hours=1)
scheduler.add_job(
    func=sync_third_party_data,
    trigger="interval",  # 简写:直接指定触发器类型字符串,无需手动创建Trigger对象
    hours=1,             # 核心参数:时间间隔(可选 seconds/minutes/hours/days/weeks)
    id="sync_data_job",
    replace_existing=True
)

if __name__ == "__main__":
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
    

1.2.2 CronTrigger( cron 表达式触发器)

原理说明

按 cron 表达式定义的时间执行任务,支持复杂的时间规则(如每天凌晨 2 点、每周一上午 10 点、每月 1 号凌晨 3 点),适合「固定时间点执行」的场景(如每天凌晨清理日志)。

Cron 表达式格式(7个字段,空格分隔):秒 分 时 日 月 周 年(可选)

常用示例:

  • 0 0 2 * * *:每天凌晨 2 点(秒0、分0、时2,其余任意)
  • 0 */30 * * * *:每 30 分钟执行一次
  • 0 0 1 * * 1:每周一凌晨 1 点

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler

# 任务函数
def clean_logs():
    """模拟:每天凌晨 2 点清理日志"""
    print("正在清理日志文件...")
    # 实际场景:删除过期日志、压缩日志文件、记录清理日志

scheduler = BlockingScheduler()

# 添加任务:每天凌晨 2 点执行(cron 触发器)
# 方式1:用 cron 表达式字符串
scheduler.add_job(
    func=clean_logs,
    trigger="cron",
    expr="0 0 2 * * *",  # cron 表达式:每天凌晨2点
    id="clean_logs_job",
    replace_existing=True
)

# 方式2:用关键字参数(更直观,无需记 cron 表达式)
# scheduler.add_job(
#     func=clean_logs,
#     trigger="cron",
#     hour=2,  # 时:2点
#     minute=0, # 分:0分
#     second=0, # 秒:0秒
#     id="clean_logs_job",
#     replace_existing=True
# )

if __name__ == "__main__":
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
    

1.2.3 DateTrigger(固定日期触发器)

原理说明

只在「指定的具体日期时间」执行一次任务,适合「一次性定时任务」的场景(如用户自定义的某个时间点的提醒)。

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

# 任务函数
def user_reminder(user_id: int, content: str):
    """模拟:用户自定义定时提醒"""
    print(f"用户 {user_id} 的提醒:{content},提醒时间:{datetime.now()}")

scheduler = BlockingScheduler()

# 定义提醒时间:当前时间 + 10 秒(模拟10秒后的提醒)
reminder_time = datetime.now() + timedelta(seconds=10)

# 添加任务:只在指定时间执行一次(date 触发器)
scheduler.add_job(
    func=user_reminder,
    trigger="date",
    run_date=reminder_time,  # 固定执行时间(datetime对象)
    args=[1001, "该喝水啦!"], # 传递用户ID和提醒内容
    id="user_reminder_job",
    replace_existing=True
)

if __name__ == "__main__":
    print(f"提醒任务已添加,将在 {reminder_time} 执行")
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
    

常见陷阱

  • 陷阱1:cron 表达式格式错误 → 任务不执行,无报错提示,解决方案:用在线 cron 表达式校验工具(如 cron.qqe2.com)校验,或用关键字参数(hour/minute)替代表达式。
  • 陷阱2:date 触发器的 run_date 设为过去的时间 → 任务会立即执行一次(如果未设置 misfire_grace_time),解决方案:确保 run_date 是未来的时间,或添加 misfire_grace_time=None(过去的任务不执行)。
  • 陷阱3:interval 触发器的间隔设置过小(如1秒),任务执行时间超过间隔 → 任务会堆积,解决方案:合理设置间隔,或添加 coalesce=True(堆积的任务只执行一次)。

中级篇:任务持久化与动态管理

中级篇目标:解决基础篇「内存存储任务丢失」的问题,实现任务持久化,掌握任务的动态添加/删除/暂停,以及错误处理,为 FastAPI 集成打下基础。

2.1 任务持久化(SQLite / PostgreSQL)

原理说明

基础篇的任务存储在内存中,服务重启后所有任务丢失,实际开发中需要「持久化存储」任务信息(如任务ID、执行时间、状态)。

APScheduler 支持多种持久化存储后端,核心是「JobStore(任务存储)」,本文重点讲解两种常用数据库:

  • SQLite:轻量、文件型数据库,无需单独部署,适合开发环境或小规模生产环境;
  • PostgreSQL:关系型数据库,支持高并发、事务,适合大规模生产环境。

使用方法:初始化调度器时,通过 jobstores 参数配置 JobStore,将任务存储到数据库中。

2.1.1 SQLite 持久化(推荐开发环境)

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler

# 初始化调度器,配置 SQLite 作为任务存储
# jobstores:字典,key 是 JobStore 名称,value 是配置(类型、数据库路径等)
scheduler = BlockingScheduler(
    jobstores={
        "default": {  # 默认 JobStore,所有任务默认存储到这里
            "type": "sqlalchemy",  # 存储类型:sqlalchemy(支持多种关系型数据库)
            "url": "sqlite:///apscheduler_jobs.db"  # SQLite 数据库路径(当前目录下生成文件)
        }
    }
)

# 定义任务(与基础篇一致)
def clean_logs():
    print("正在清理日志(SQLite 持久化)...")

# 添加任务(任务会自动存储到 SQLite 数据库中)
scheduler.add_job(
    func=clean_logs,
    trigger="cron",
    hour=2,
    id="clean_logs_job",
    replace_existing=True,
    jobstore="default"  # 指定存储到 default JobStore(可省略,默认就是)
)

if __name__ == "__main__":
    try:
        print("调度器已启动(SQLite 持久化),按 Ctrl+C 停止...")
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        print("调度器已停止,任务信息已保存到 SQLite")
    

2.1.2 PostgreSQL 持久化(推荐生产环境)

前置准备

  1. 安装依赖(需要 psycopg2 驱动):pip install psycopg2-binary sqlalchemy
  2. 提前创建 PostgreSQL 数据库(如 apscheduler_db),并配置好用户名、密码、端口。

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler

# 初始化调度器,配置 PostgreSQL 作为任务存储
# url 格式:postgresql+psycopg2://用户名:密码@主机:端口/数据库名
scheduler = BlockingScheduler(
    jobstores={
        "default": {
            "type": "sqlalchemy",
            "url": "postgresql+psycopg2://postgres:123456@localhost:5432/apscheduler_db"
        }
    }
)

# 定义任务
def sync_third_party_data():
    print("正在同步第三方数据(PostgreSQL 持久化)...")

# 添加任务(任务会存储到 PostgreSQL 中,服务重启后可恢复)
scheduler.add_job(
    func=sync_third_party_data,
    trigger="interval",
    hours=1,
    id="sync_data_job",
    replace_existing=True
)

if __name__ == "__main__":
    try:
        print("调度器已启动(PostgreSQL 持久化),按 Ctrl+C 停止...")
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        print("调度器已停止,任务信息已保存到 PostgreSQL")
    

常见陷阱

  • 陷阱1:未安装 sqlalchemy 依赖 → 报错 ImportError: No module named 'sqlalchemy',解决方案:pip install sqlalchemy
  • 陷阱2:PostgreSQL 连接信息错误 → 报错 OperationalError: could not connect to server,解决方案:检查用户名、密码、主机、端口是否正确,确保 PostgreSQL 服务已启动。
  • 陷阱3:数据库中已存在相同 id 的任务,且未设置 replace_existing=True → 报错「任务已存在」,解决方案:要么删除数据库中原有任务,要么添加 replace_existing=True
  • 陷阱4:持久化后,修改任务函数代码,重启服务后任务不生效 → 因为任务的「函数引用」存储在数据库中,但函数代码修改后,数据库中的引用不会自动更新,解决方案:删除数据库中对应的任务,重新添加。

2.2 动态添加/删除/暂停/恢复任务

原理说明

实际开发中,需要通过代码(或 API)动态管理任务(如用户自定义提醒时添加任务、取消提醒时删除任务),APScheduler 提供了丰富的 API 用于任务管理,核心是通过「调度器对象」操作 Job。

核心 API:

  • 添加任务:scheduler.add_job()(返回 Job 对象);
  • 删除任务:scheduler.remove_job(job_id, jobstore="default")
  • 暂停任务:scheduler.pause_job(job_id)(暂停后任务不再执行,重启服务后仍处于暂停状态,持久化生效);
  • 恢复任务:scheduler.resume_job(job_id)(恢复暂停的任务);
  • 查询任务:scheduler.get_job(job_id)(获取单个任务)、scheduler.get_jobs()(获取所有任务);
  • 修改任务:scheduler.modify_job(job_id, **kwargs)(修改触发器、参数等)。

可运行代码(动态管理演示)

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
import time

# 初始化调度器(使用 SQLite 持久化,确保动态操作的任务不会丢失)
scheduler = BlockingScheduler(
    jobstores={
        "default": {"type": "sqlalchemy", "url": "sqlite:///apscheduler_jobs.db"}
    }
)

# 1. 定义任务函数
def dynamic_task(task_name: str):
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 动态任务执行:{task_name}")

# 2. 定义动态管理任务的函数(模拟 API 调用场景)
def add_dynamic_job(task_id: str, task_name: str, interval_seconds: int):
    """动态添加任务"""
    # 先检查任务是否已存在,存在则删除
    if scheduler.get_job(task_id):
        scheduler.remove_job(task_id)
    # 添加新任务
    scheduler.add_job(
        func=dynamic_task,
        trigger="interval",
        seconds=interval_seconds,
        args=[task_name],
        id=task_id,
        replace_existing=True
    )
    print(f"已添加动态任务:id={task_id},名称={task_name},间隔={interval_seconds}秒")

def delete_dynamic_job(task_id: str):
    """动态删除任务"""
    if scheduler.get_job(task_id):
        scheduler.remove_job(task_id)
        print(f"已删除动态任务:id={task_id}")
    else:
        print(f"未找到任务:id={task_id}")

def pause_dynamic_job(task_id: str):
    """动态暂停任务"""
    if scheduler.get_job(task_id):
        scheduler.pause_job(task_id)
        print(f"已暂停动态任务:id={task_id}")
    else:
        print(f"未找到任务:id={task_id}")

def resume_dynamic_job(task_id: str):
    """动态恢复任务"""
    if scheduler.get_job(task_id):
        scheduler.resume_job(task_id)
        print(f"已恢复动态任务:id={task_id}")
    else:
        print(f"未找到任务:id={task_id}")

# 3. 启动调度器(用线程启动,避免阻塞后续代码,模拟 FastAPI 异步环境)
import threading
def start_scheduler():
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

# 启动调度器线程
scheduler_thread = threading.Thread(target=start_scheduler, daemon=True)
scheduler_thread.start()

# 4. 模拟动态操作任务(模拟用户通过 API 调用)
if __name__ == "__main__":
    print("调度器线程已启动,开始动态管理任务...")
    
    # 模拟1:添加任务(id=task1,每3秒执行一次)
    add_dynamic_job("task1", "测试任务1", 3)
    time.sleep(10)  # 等待10秒,观察任务执行
    
    # 模拟2:暂停任务
    pause_dynamic_job("task1")
    time.sleep(10)  # 等待10秒,观察任务是否暂停
    
    # 模拟3:恢复任务
    resume_dynamic_job("task1")
    time.sleep(10)  # 等待10秒,观察任务是否恢复
    
    # 模拟4:删除任务
    delete_dynamic_job("task1")
    time.sleep(5)   # 等待5秒,观察任务是否停止
    
    print("动态任务管理演示结束,退出程序...")
    # 停止调度器
    scheduler.shutdown()
    

常见陷阱

  • 陷阱1:在调度器未启动的情况下,执行动态任务操作 → 报错 SchedulerNotRunningError,解决方案:确保调度器已启动(或在启动后执行动态操作)。
  • 陷阱2:删除/暂停任务时,任务 id 错误或 JobStore 不匹配 → 无报错,但任务未被删除/暂停,解决方案:检查 task_id 是否正确,确保 jobstore 参数与添加任务时一致。
  • 陷阱3:动态添加任务后,服务重启,任务丢失 → 未配置持久化 JobStore,解决方案:使用 SQLAlchemy JobStore(SQLite/PostgreSQL),确保任务持久化。

2.3 任务参数传递与错误处理

2.3.1 任务参数传递

原理说明

任务函数往往需要接收参数(如用户ID、提醒内容、API地址等),APScheduler 支持通过 args(位置参数)和 kwargs(关键字参数)传递参数,参数会与任务一起持久化(如果配置了 JobStore)。

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler

# 任务函数(接收位置参数和关键字参数)
def send_reminder(user_id: int, content: str, priority: str = "normal"):
    """模拟发送用户提醒,接收3个参数"""
    print(f"给用户 {user_id} 发送提醒(优先级:{priority}):{content}")

scheduler = BlockingScheduler()

# 添加任务,传递参数
scheduler.add_job(
    func=send_reminder,
    trigger="interval",
    seconds=5,
    args=[1001, "该吃药了!"],  # 位置参数:对应 user_id=1001,content="该吃药了!"
    kwargs={"priority": "high"},  # 关键字参数:对应 priority="high"
    id="reminder_job",
    replace_existing=True
)

if __name__ == "__main__":
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
    

2.3.2 任务错误处理

原理说明

任务执行过程中可能出现错误(如API调用失败、数据库连接异常),如果不处理错误,任务会直接失败,且调度器会继续执行下一次任务,错误信息会被忽略。

APScheduler 提供两种错误处理方式:

  1. 在任务函数内部使用 try-except 捕获错误(推荐,灵活可控);
  2. 通过调度器的 add_listener 注册全局错误监听器,捕获所有任务的错误。

可运行代码

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_ERROR  # 导入任务错误事件

# 方式1:在任务函数内部捕获错误(推荐)
def sync_third_party_data(api_url: str):
    try:
        print(f"正在调用第三方API:{api_url}")
        # 模拟API调用失败
        raise Exception("第三方API服务不可用")  # 模拟错误
    except Exception as e:
        # 捕获错误,做自定义处理(如记录日志、重试)
        print(f"任务执行失败:{str(e)},已记录错误日志,将在下一次重试")
        # 可选:主动重试(或配置调度器的重试策略)
        # raise  # 抛出错误,让调度器处理(如触发全局错误监听器)

# 方式2:注册全局错误监听器(捕获所有任务的错误)
def job_error_listener(event):
    """全局任务错误监听器"""
    if event.exception:
        print(f"\n全局捕获任务错误:")
        print(f"任务ID:{event.job_id}")
        print(f"错误信息:{str(event.exception)}")
        print(f"错误堆栈:{event.traceback}")

# 初始化调度器
scheduler = BlockingScheduler()

# 注册全局错误监听器(监听 EVENT_JOB_ERROR 事件)
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)

# 添加任务(任务执行会出错,会被两种方式捕获)
scheduler.add_job(
    func=sync_third_party_data,
    trigger="interval",
    seconds=5,
    args=["https://api.example.com/data"],
    id="sync_data_job",
    replace_existing=True,
    max_instances=1,  # 最大并发实例数(避免任务并发执行)
    misfire_grace_time=10,  # 任务错过执行时间后,允许延迟10秒执行(超过则放弃)
)

if __name__ == "__main__":
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
    

常见陷阱

  • 陷阱1:参数类型不匹配 → 任务执行报错「TypeError: 函数缺少参数/参数类型错误」,解决方案:确保 args/kwargs 的参数类型、数量与任务函数一致。
  • 陷阱2:参数包含不可序列化的对象 → 任务无法持久化(报错「PicklingError」),解决方案:传递可序列化的参数(如字符串、数字、字典),避免传递类实例、函数等不可序列化对象。
  • 陷阱3:未处理错误,导致任务失败后无任何提示 → 难以排查问题,解决方案:要么在任务内部捕获错误,要么注册全局错误监听器,记录错误日志。
  • 陷阱4:任务错误后,调度器停止运行 → 默认情况下,调度器不会因单个任务错误而停止,若出现此问题,大概率是错误未捕获,导致触发了未处理的异常,解决方案:完善错误处理逻辑。

进阶篇:FastAPI 异步环境适配

进阶篇目标:解决「基础篇/中级篇的同步调度器不适合 FastAPI 异步环境」的问题,掌握 AsyncIOScheduler 的使用,实现与 FastAPI、数据库的无缝集成,解决分布式部署下的任务去重问题。

核心重点:FastAPI 是异步框架,基于 asyncio 事件循环,而基础篇的 BlockingScheduler 是同步调度器,会阻塞事件循环,导致 FastAPI 无法处理请求,因此必须使用 AsyncIOScheduler(异步调度器)。

3.1 为什么不能直接用 BackgroundTasks 做长期定时任务?

原理说明

FastAPI 提供的 BackgroundTasks 用于「处理单个请求的后台任务」(如请求结束后发送邮件、记录日志),但不适合做「长期定时任务」,原因如下:

  1. BackgroundTasks 是「请求级别的」,仅在当前请求处理完成后执行一次,无法实现周期性执行(如每小时同步数据);
  2. BackgroundTasks 不支持持久化,服务重启后任务丢失;
  3. BackgroundTasks 不支持动态管理(无法添加、删除、暂停任务);
  4. BackgroundTasks 无错误重试机制,任务失败后无法自动重试;
  5. 长期运行的 BackgroundTasks 会阻塞请求的响应(虽然是后台执行,但仍占用事件循环资源)。

结论:BackgroundTasks 适合「单次、短期」的后台任务,长期定时任务必须用 APScheduler。

3.2 AsyncIOScheduler vs Celery:何时用哪个?

原理说明

两者都是 Python 中常用的任务调度/异步任务工具,但适用场景不同,结合 FastAPI 实战选择:

AsyncIOScheduler(APScheduler 异步调度器)

优势:

  • 轻量、无依赖,无需额外部署消息队列(如 Redis/RabbitMQ);
  • 完美适配 FastAPI 异步环境,基于 asyncio 事件循环,不阻塞请求;
  • 配置简单,学习成本低,适合中小规模定时任务场景;
  • 支持任务持久化、动态管理,满足大多数定时任务需求。

缺点:

  • 不适合处理「CPU 密集型任务」(异步任务适合 IO 密集型,如 API 调用、数据库操作);
  • 分布式部署下的任务去重需要额外实现(无内置解决方案);
  • 不支持任务优先级的精细控制(相比 Celery 较弱)。

Celery

优势:

  • 功能强大,支持任务队列、优先级、重试、定时任务、分布式部署等;
  • 适合处理「CPU 密集型任务」(可通过多进程/多线程执行);
  • 内置分布式支持,多实例部署下任务去重、负载均衡更简单;
  • 生态完善,可与 Redis、RabbitMQ 等消息队列深度集成。

缺点:

  • 架构复杂,需要额外部署消息队列(Redis/RabbitMQ)和 Worker 进程,部署成本高;
  • 学习成本高,配置繁琐;
  • 与 FastAPI 异步环境的适配需要额外处理(如 Celery 异步任务与 asyncio 事件循环的协同)。

选择建议(结合 FastAPI)

  • 如果是「IO 密集型定时任务」(如 API 调用、数据同步、日志清理、用户提醒),且架构简单(中小规模),优先选择 AsyncIOScheduler
  • 如果是「CPU 密集型任务」(如数据计算、模型训练),或需要大规模分布式部署、精细的任务优先级控制,选择 Celery
  • 本文重点讲解 AsyncIOScheduler(贴合大多数 FastAPI 定时任务场景)。

3.3 AsyncIOScheduler 基础使用(异步调度器)

原理说明

AsyncIOScheduler 是 APScheduler 提供的「异步调度器」,基于 asyncio 事件循环,专门用于异步环境(如 FastAPI),核心特点:

  • 不阻塞 asyncio 事件循环,可与 FastAPI 请求处理并行执行;
  • 支持执行异步任务函数(async def 定义的函数),可直接 await 异步操作(如异步数据库、异步 API 调用);
  • 使用方式与 BlockingScheduler 类似,只需替换调度器类,少量修改配置。

前置准备

安装依赖:pip install apscheduler fastapi uvicorn(uvicorn 用于运行 FastAPI 服务)

可运行代码(AsyncIOScheduler 基础演示)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 进阶篇:AsyncIOScheduler 基础使用(适配 FastAPI 异步环境)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
import asyncio
from datetime import datetime

# 1. 定义异步任务函数(async def,可直接 await 异步操作)
async def async_task():
    """异步任务:模拟异步 API 调用或异步数据库操作"""
    # 模拟异步操作(如 await db.query()、await httpx.AsyncClient().get())
    await asyncio.sleep(1)  # 模拟 IO 等待,不阻塞事件循环
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 异步任务执行成功")

# 2. 初始化 AsyncIOScheduler(异步调度器,不阻塞事件循环)
scheduler = AsyncIOScheduler()

# 3. 添加异步任务(与 BlockingScheduler 用法类似,支持所有触发器)
scheduler.add_job(
    func=async_task,  # 传入异步任务函数
    trigger=IntervalTrigger(seconds=5),  # 每5秒执行一次
    id="async_task_job",
    replace_existing=True,
    max_instances=1,  # 最大并发实例数(避免异步任务并发执行导致资源耗尽)
    misfire_grace_time=10  # 错过执行时间后,允许延迟10秒执行
)

# 4. 启动调度器(异步调度器,不会阻塞当前线程)
async def start_scheduler():
    scheduler.start()
    print("AsyncIOScheduler 已启动,按 Ctrl+C 停止...")
    # 保持事件循环运行(避免调度器启动后立即退出)
    while True:
        await asyncio.sleep(3600)  # 每小时检查一次(可根据需求调整)

# 5. 运行事件循环(模拟 FastAPI 运行环境)
if __name__ == "__main__":
    try:
        asyncio.run(start_scheduler())
    except (KeyboardInterrupt, SystemExit):
        # 优雅停止调度器
        scheduler.shutdown()
        print("AsyncIOScheduler 已停止")
    

常见陷阱

  • 陷阱1:在 AsyncIOScheduler 中执行同步任务函数 → 同步函数会阻塞 asyncio 事件循环,导致 FastAPI 无法处理请求,解决方案:要么将同步函数改为异步函数,要么用 loop.run_in_executor 包装同步函数(放到线程池/进程池执行)。
  • 陷阱2:调度器启动后,事件循环立即退出 → 因为 AsyncIOScheduler 是非阻塞的,启动后如果没有其他任务,事件循环会立即结束,解决方案:添加 while True: await asyncio.sleep(xxx) 保持事件循环运行(FastAPI 会自动维护事件循环,无需手动处理)。
  • 陷阱3:异步任务函数中未使用 await,导致任务执行异常 → 异步函数中如果没有 await 操作,会被当作同步函数执行,且可能报错「RuntimeWarning: coroutine 'xxx' was never awaited」,解决方案:在异步任务函数中,对 IO 操作使用 await。
  • 陷阱4:重复启动 AsyncIOScheduler → 报错 SchedulerAlreadyRunningError,解决方案:在启动前检查调度器状态(if not scheduler.running: scheduler.start())。

3.4 与数据库集成(SQLAlchemy / SQLModel)

原理说明

FastAPI 实战中,通常会使用 SQLAlchemy 或 SQLModel 作为 ORM 工具,与数据库交互。AsyncIOScheduler 的任务持久化,可直接复用 FastAPI 的数据库配置,将任务存储到与业务数据相同的数据库中(如 PostgreSQL),实现任务与业务数据的统一管理。

核心要点:

  • 使用 sqlalchemy 类型的 JobStore,复用 FastAPI 的数据库 URL;
  • 异步任务函数中,可直接使用 FastAPI 的异步数据库会话(如 AsyncSession),执行异步数据库操作;
  • 任务的参数可传递数据库模型相关的信息(如用户ID),但避免传递数据库会话对象(不可序列化)。

可运行代码(FastAPI + SQLModel + AsyncIOScheduler)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 进阶篇:FastAPI + SQLModel + AsyncIOScheduler 集成(异步数据库)
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlmodel import Field, SQLModel, create_async_engine, AsyncSession
from sqlmodel.ext.asyncio.session import AsyncSession
import asyncio
from datetime import datetime

# ------------------------------
# 1. 数据库配置(SQLModel + PostgreSQL 异步)
# ------------------------------
# 数据库连接 URL(与 FastAPI 业务数据库一致)
DATABASE_URL = "postgresql+asyncpg://postgres:123456@localhost:5432/fastapi_apscheduler"

# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True, future=True)

# 定义业务模型(示例:用户提醒模型)
class Reminder(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    user_id: int = Field(index=True)
    content: str
    created_at: datetime = Field(default_factory=datetime.utcnow)
    is_sent: bool = Field(default=False)

# 数据库初始化(创建表)
async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

# 获取异步数据库会话
async def get_session() -> AsyncSession:
    async with AsyncSession(engine) as session:
        yield session

# ------------------------------
# 2. 异步任务函数(操作数据库)
# ------------------------------
async def send_reminder_task(reminder_id: int):
    """异步任务:发送用户提醒,并更新数据库状态"""
    # 获取数据库会话
    async with AsyncSession(engine) as session:
        try:
            # 1. 查询提醒信息
            reminder = await session.get(Reminder, reminder_id)
            if not reminder:
                print(f"提醒 {reminder_id} 不存在,任务终止")
                return
            
            # 2. 模拟发送提醒(如调用短信API、邮件API)
            await asyncio.sleep(1)  # 模拟异步IO操作
            print(f"给用户 {reminder.user_id} 发送提醒:{reminder.content}")
            
            # 3. 更新数据库状态(标记为已发送)
            reminder.is_sent = True
            session.add(reminder)
            await session.commit()
            await session.refresh(reminder)
            print(f"提醒 {reminder_id} 状态已更新为已发送")
        
        except Exception as e:
            await session.rollback()  # 错误回滚
            print(f"发送提醒 {reminder_id} 失败:{str(e)}")

# ------------------------------
# 3. 初始化 AsyncIOScheduler(与数据库集成)
# ------------------------------
def init_scheduler():
    """初始化调度器,配置 SQLModel 数据库作为任务存储"""
    # 配置 JobStore(复用 FastAPI 数据库 URL)
    jobstores = {
        "default": SQLAlchemyJobStore(
            url=DATABASE_URL.replace("+asyncpg", ""),  # 注意:APScheduler 的 SQLAlchemyJobStore 不支持 asyncpg 驱动,需去掉
            engine=engine.sync_engine  # 传递同步引擎(APScheduler 内部使用同步操作)
        )
    }
    
    # 初始化异步调度器
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    
    # 启动调度器(非阻塞)
    if not scheduler.running:
        scheduler.start()
        print("AsyncIOScheduler 已启动(与 SQLModel 集成)")
    
    return scheduler

# ------------------------------
# 4. FastAPI 应用初始化(集成调度器)
# ------------------------------
app = FastAPI(lifespan=lifespan)  # lifespan 用于启动/关闭时执行操作

# 全局调度器对象(供后续 API 调用)
scheduler = None

# FastAPI 生命周期管理(启动/关闭调度器、初始化数据库)
async def lifespan(app: FastAPI):
    # 启动阶段:初始化数据库、初始化调度器
    await init_db()
    global scheduler
    scheduler = init_scheduler()
    yield  # 应用运行中
    # 关闭阶段:优雅停止调度器
    if scheduler and scheduler.running:
        scheduler.shutdown()
        print("AsyncIOScheduler 已优雅停止")

# ------------------------------
# 5. 测试接口(添加提醒任务)
# ------------------------------
@app.post("/add-reminder")
async def add_reminder(user_id: int, content: str, delay_seconds: int = 10):
    """添加用户提醒,延迟 delay_seconds 秒执行(动态添加任务)"""
    # 1. 先将提醒信息存入数据库
    async with AsyncSession(engine) as session:
        reminder = Reminder(user_id=user_id, content=content)
        session.add(reminder)
        await session.commit()
        await session.refresh(reminder)
    
    # 2. 动态添加定时任务(date 触发器,延迟指定秒数执行)
    run_date = datetime.now() + asyncio.timedelta(seconds=delay_seconds)
    scheduler.add_job(
        func=send_reminder_task,
        trigger="date",
        run_date=run_date,
        args=[reminder.id],  # 传递提醒ID(可序列化)
        id=f"reminder_{reminder.id}",
        replace_existing=True
    )
    
    return {
        "code": 200,
        "message": "提醒任务已添加",
        "data": {
            "reminder_id": reminder.id,
            "run_time": run_date.strftime("%Y-%m-%d %H:%M:%S")
        }
    }

# 运行 FastAPI 服务(uvicorn main:app --reload)
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
    

常见陷阱

  • 陷阱1:数据库 URL 驱动错误 → APScheduler 的 SQLAlchemyJobStore 不支持 asyncpg(异步 PostgreSQL 驱动),直接使用会报错,解决方案:URL 中去掉 +asyncpg,传递同步引擎 engine.sync_engine
  • 陷阱2:在异步任务中使用同步数据库会话 → 阻塞事件循环,导致 FastAPI 无法处理请求,解决方案:使用 SQLModel 的异步会话(AsyncSession),执行异步数据库操作。
  • 陷阱3:任务参数传递数据库会话对象 → 会话对象不可序列化,无法持久化,报错「PicklingError」,解决方案:传递数据库记录的 ID(如 reminder_id),在任务内部获取会话并查询数据。
  • 陷阱4:数据库事务未提交或回滚 → 任务执行成功但数据库未更新,或错误时数据不一致,解决方案:使用 try-except 捕获错误,成功时 commit,失败时 rollback。

3.5 多实例部署下的任务去重(分布式锁)

原理说明

当 FastAPI 服务部署多个实例(如用 uvicorn 启动多个 worker,或部署在多台服务器)时,每个实例都会初始化调度器,导致「同一个任务被多个实例重复执行」(如每天凌晨清理日志,多个实例同时清理,导致日志丢失或重复操作)。

解决方案:使用「分布式锁」,确保同一个任务在同一时间只能被一个实例执行,核心思路:

  1. 选择一个分布式锁存储后端(如 Redis、PostgreSQL);
  2. 任务执行前,尝试

(注:文档部分内容可能由 AI 生成)

添加新评论