check_point.py

# 核心概念
# Checkpointing = 将对话状态持久化到数据库
#     InMemorySaver → 内存中(程序退出即丢失)
#     SqliteSaver/ → SQLite 数据库(持久化存储)

# 基本用法
# InMemorySaver 的限制

import sys
import os

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from langchain.tools import tool

from init_model import get_chat_model

from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langgraph.checkpoint.sqlite import SqliteSaver


chat_model = get_chat_model()


def demo1():

    agent = create_agent(model=chat_model, tools=[], checkpointer=InMemorySaver())

    # 限制:
    # ❌ 程序重启后丢失
    # ❌ 无法跨进程共享
    # ❌ 不适合生产环境


# SqliteSaver(推荐生产使用)
def SqliteSaver_demo():

    db_path = "checkpoints.sqlite"

    with SqliteSaver.from_conn_string(db_path) as checkpointer:
        agent = create_agent(
            model=chat_model, tools=[], checkpointer=checkpointer  # 使用 SQLite
        )
        config = {"configurable": {"thread_id": "user_123"}}
        # 第一次运行
        agent.invoke(
            {"messages": [{"role": "user", "content": "你好,我叫张三"}]}, config
        )

    with SqliteSaver.from_conn_string(db_path) as checkpointer:
        agent = create_agent(model=chat_model, checkpointer=checkpointer)
        result = agent.invoke(
            {"messages": [{"role": "user", "content": "我是谁"}]}, config
        )
        print(result["messages"][-1].content)


# 生成的checkpoints.sqlit文件

# ├──  09 - Checkpointing (检查点持久化)/
# │   └──  check_point.py
# ├── 📁 其他练习/
# ├── 📁 tools/
# ├── ⚙️ .env
# ├── ️ checkpoints.sqlite
# ├── 🗄️ checkpoints.sqlite-shm
# ├── 🗄️ checkpoints.sqlite-wal
# └── 📄 init_model.py

# 运行结果
# 你是**张三**呀!👋
# 你之前已经跟我介绍过很多次啦(比如最开始你就说过“你好,我叫张三”)。 虽然你可能是在考我的记忆力,但我确实把你的名字记在心里了哦!😄
# 除了名字之外,我还挺好奇你的其他故事的。今天有什么特别想聊的话题,或 者需要我帮忙的地方吗?


# 跨进程访问
# 进程 A(Web 服务器)
def demo2():
    with SqliteSaver.from_conn_string("shared.sqlite") as checkpointer:
        agent_a = create_agent(model=model, checkpointer=checkpointer)
        agent_a.invoke({...}, config={"configurable": {"thread_id": "user_1"}})

    # 进程 B(后台任务)
    with SqliteSaver.from_conn_string("shared.sqlite") as checkpointer:
        agent_b = create_agent(model=model, checkpointer=checkpointer)
        # 可以访问进程 A 创建的对话
        agent_b.invoke({...}, config={"configurable": {"thread_id": "user_1"}})


# 参数说明
# SqliteSaver.from_conn_string()
# 参数    说明    示例
# conn_string    数据库文件路径(不要加 sqlite:/// 前缀)    "checkpoints.sqlite"


# 路径格式
# 相对路径(当前目录) - 推荐
def demo3():
    with SqliteSaver.from_conn_string("checkpoints.sqlite") as checkpointer:
        agent = create_agent(model=model, checkpointer=checkpointer)

    # 绝对路径(生产环境)
    with SqliteSaver.from_conn_string("C:/data/checkpoints.sqlite") as checkpointer:
        agent = create_agent(model=model, checkpointer=checkpointer)


# 重要:
#     ✅ 直接传文件路径,不要加 sqlite:/// 前缀
#     ✅ 相对路径会在当前目录创建数据库
#     ✅ Windows 路径使用正斜杠 / 或双反斜杠 \\

# 对比 InMemorySaver
# 特性    InMemorySaver    SqliteSaver
# 持久化    ❌ 程序退出即丢失    ✅ 持久化到文件
# 跨进程    ❌ 无法共享    ✅ 可以共享
# 性能    ⚡ 快(内存)    🐢 慢一点(磁盘 I/O)
# 适用    开发、测试    生产环境

# 实际应用
# 1.客服系统


def demo4():
    # 客户今天上午咨询
    with SqliteSaver.from_conn_string("customer_service.sqlite") as checkpointer:
        agent = create_agent(model=model, tools=[查询订单], checkpointer=checkpointer)

        config = {"configurable": {"thread_id": "customer_zhang"}}
        agent.invoke(
            {"messages": [{"role": "user", "content": "订单 12345 在哪?"}]}, config
        )

    # 下午客户再次咨询(即使服务重启)
    with SqliteSaver.from_conn_string("customer_service.sqlite") as checkpointer:
        agent = create_agent(model=model, tools=[查询订单], checkpointer=checkpointer)
        agent.invoke({"messages": [{"role": "user", "content": "到了吗?"}]}, config)
        # Agent 记得上午查询的订单号!


# 多用户聊天
def demo5():
    with SqliteSaver.from_conn_string("chat.sqlite") as checkpointer:
        agent = create_agent(model=model, checkpointer=checkpointer)
    # 用户A
    agent.invoke({...}, config={"configurable": {"thread_id": "user_alice"}})
    # 用户B
    agent.invoke({...}, config={"configurable": {"thread_id": "user_bob"}})
    # 所有用户的对话都持久化在 chat.sqlite 中


# 常见问题
# 1. 数据库文件在哪?
# # 相对路径 → 当前工作目录
#     SqliteSaver.from_conn_string("sqlite:///checkpoints.sqlite")
# # 文件位置:当前目录/checkpoints.sqlite

# # 绝对路径 → 指定位置
#     SqliteSaver.from_conn_string("sqlite:///C:/data/checkpoints.sqlite")
# # 文件位置:C:/data/checkpoints.sqlite

# 2. 如何清空某个用户的历史?
# 目前需要手动操作数据库:
# import sqlite3

# conn = sqlite3.connect("checkpoints.sqlite")
# cursor = conn.cursor()

# # 删除特定 thread_id 的记录
# cursor.execute("DELETE FROM checkpoints WHERE thread_id = ?", ("user_123",))
# conn.commit()
# conn.close()

# 3. 数据库会无限增长吗?
# 会!需要定期清理:

# 策略:
#     定期删除旧对话(如 30 天前)
#     限制每个 thread 的 checkpoint 数量
#     定期备份和归档
# 4. 性能影响?
#     SQLite 比内存慢,但影响不大
#     适合中小型应用(< 10000 并发用户)
#     大规模应用考虑 PostgreSQL(LangGraph 也支持)

# 最佳实践

# 1. 生产环境使用绝对路径 + with 语句
# with SqliteSaver.from_conn_string("C:/production/data/checkpoints.sqlite") as checkpointer:
#     agent = create_agent(model=model, checkpointer=checkpointer)

# # 2. 开发环境使用相对路径
# with SqliteSaver.from_conn_string("dev_checkpoints.sqlite") as checkpointer:
#     agent = create_agent(model=model, checkpointer=checkpointer)

# # 3. 测试环境使用内存数据库
# with SqliteSaver.from_conn_string(":memory:") as checkpointer:
#     agent = create_agent(model=model, checkpointer=checkpointer)

# # 4. 定期备份数据库文件
# # 使用系统任务定期复制 checkpoints.sqlite

# # 5. 监控数据库大小
# import os
# db_size = os.path.getsize("checkpoints.sqlite")
# print(f"数据库大小: {db_size / 1024 / 1024:.2f} MB")


# 核心要点
#     InMemorySaver:内存存储,程序退出即丢失
#     SqliteSaver:持久化到 SQLite 文件
#     创建方式:with SqliteSaver.from_conn_string("checkpoints.sqlite") as checkpointer:
#     路径格式:直接传文件路径,不要加 sqlite:/// 前缀
#     跨进程:多个进程可访问同一数据库
#     生产推荐:使用 SqliteSaver + with 语句


if __name__ == "__main__":
    SqliteSaver_demo()
添加新评论