LangGraph
# 什么是 LangGraph?
# LangGraph 是一个用于构建状态化、多步骤 AI 应用的框架。它使用图(Graph) 的概念来组织工作流:
# 节点(Nodes):图中的处理单元,可以是 LLM 调用、工具执行或自定义函数
# 边(Edges):连接节点的路径,定义执行顺序
# 状态(State):在节点之间传递的数据结构
# LangGraph vs LangChain
# 特性 LangChain LangGraph
# 抽象级别 高级 低级
# 适用场景 快速构建标准 Agent 复杂自定义工作流
# 控制粒度 通过中间件 完全控制每个节点
# 状态管理 自动 手动但灵活
# Hello World Graph
# 1. 定义状态 - 使用 TypedDict
from typing import TypedDict, Annotated,Dict
from langgraph.graph import StateGraph
class AgentState(TypedDict):
message: str
def greeting_node(state: AgentState)-> AgentState:
""" 一个向状态添加消息的简单节点 """
state['message'] = "Hello" + state["message"]
return state
graph = StateGraph(AgentState)
graph.add_node("greeter",greeting_node)
graph.set_entry_point("greeter")
graph.set_finish_point("greeter")
app = graph.compile()
打印图流程
from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))
多个节点并添加边
#多个节点并添加边
from typing import TypedDict, Annotated,Dict
from langgraph.graph import StateGraph
class AgentState(TypedDict):
name :str
age : str
final:str
def first_node(state: AgentState)-> AgentState:
"""这是第一个节点"""
state["final"] = f"你好 {state['name']}"
return state
def second_node(state: AgentState)-> AgentState:
"""这是第二个节点"""
state["final"] = state["final"] + f"你已经 {state['age']}了"
return state
graph = StateGraph(AgentState)
graph.add_node("first_node",first_node)
graph.add_node("second_node",second_node)
graph.set_entry_point("first_node")
graph.add_edge("first_node","second_node")
graph.set_finish_point("second_node")
app = graph.compile()
from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))
result = app.invoke({"name":"zhangsan","age":"18",})
print(result)
加模型顺序执行
#加模型顺序执行
from langgraph.graph import StateGraph, START, END
from langchain_ollama import ChatOllama
from typing import TypedDict, Annotated,Dict
# ==========================================
# 1. 初始化模型
# ==========================================
model = ChatOllama(
model="qwen3.5:2b",
base_url="http://localhost:11434",
temperature=0.7,
)
class SimpleState(TypedDict):
input_text:str
processed_text :str
llm_response :str
final_output:str
# 定义节点函数
def preprocess(state: SimpleState) -> dict:
"""预处理节点:清理和格式化输入"""
text = state["input_text"].strip().lower()
print(f" [预处理] 输入: '{state['input_text']}' -> '{text}'")
return {"processed_text": text}
def call_llm(state: SimpleState) -> dict:
"""LLM 节点:调用语言模型"""
messages = [
{"role": "system", "content": "你是一个友好的助手,请简洁回答问题。"},
{"role": "user", "content": state["processed_text"]}
]
response = model.invoke(messages)
print(f" [LLM] 响应: {response.content[:50]}...")
return {"llm_response": response.content}
def postprocess(state: SimpleState) -> dict:
"""后处理节点:格式化输出"""
final = f"✨ AI 回复:{state['llm_response']}"
print(" [后处理] 完成格式化")
return {"final_output": final}
# 构建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("preprocess", preprocess)
graph.add_node("call_llm", call_llm)
graph.add_node("postprocess", postprocess)
# 添加边(定义执行顺序)
graph.add_edge(START, "preprocess")
graph.add_edge("preprocess", "call_llm")
graph.add_edge("call_llm", "postprocess")
graph.add_edge("postprocess", END)
# 编译图
app = graph.compile()
from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))
result = app.invoke({"input_text": " 介绍一下你自己 "})
print(f"\n最终输出:\n{result['final_output']}")
条件分支工作流
from typing import Literal
#条件分支工作流
#加模型顺序执行
from langgraph.graph import StateGraph, START, END
from langchain_ollama import ChatOllama
from typing import TypedDict, Annotated,Dict
class AgentState(TypedDict):
query: str
query_type: str
response: str
def classify_query(state: AgentState) ->dict:
"""分类节点:判断查询类型"""
query = state["query"].lower()
if any(word in query for word in ["天气", "温度", "下雨"]):
query_type = "weather"
elif any(word in query for word in ["计算", "加", "减", "乘", "除", "等于"]):
query_type = "math"
else:
query_type = "general"
print(f" [分类] 查询类型: {query_type}")
return {"query_type": query_type}
def handle_weather(state: AgentState) -> dict:
"""处理天气查询"""
print(" [天气处理] 执行天气查询逻辑...")
# 实际应用中这里会调用天气 API
response = "🌤️ 今天天气晴朗,温度 25°C,适合外出!"
return {"response": response}
def handle_math(state: AgentState) -> dict:
"""处理数学计算"""
print(" [数学处理] 执行计算逻辑...")
content = "计算器损坏,计算结果统一为 0。"
return {"response": f"🔢 {content}"}
def handle_general(state: AgentState) -> dict:
"""处理一般查询"""
print(" [通用处理] 执行通用 LLM 调用...")
messages = [
{"role": "system", "content": "你是一个友好的助手,请简洁回答问题。"},
{"role": "user", "content": state["query"]}
]
result = model.invoke(messages)
return {"response": f"💡 {result.content}"}
def route_query(state: AgentState) -> Literal["weather", "math", "general"]:
"""路由函数:返回下一个节点名称"""
return state["query_type"]
# 构建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("classify", classify_query)
graph.add_node("weather", handle_weather)
graph.add_node("math", handle_math)
graph.add_node("general", handle_general)
# 添加边
graph.add_edge(START, "classify")
# 添加条件边
graph.add_conditional_edges(
"classify", # 从哪个节点出发
route_query, # 路由函数
{ # 路由映射
"weather": "weather",
"math": "math",
"general": "general"
}
)
# 所有处理节点都到 END
graph.add_edge("weather", END)
graph.add_edge("math", END)
graph.add_edge("general", END)
# 编译
app = graph.compile()
from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))
# 测试不同类型的查询
test_queries = [
"今天北京的天气怎么样?",
"计算 123 加 456 等于多少?",
"Python 是什么编程语言?"
]
for query in test_queries:
print(f"\n查询: {query}")
result = app.invoke({"query": query})
print(f"响应: {result['response'][:100]}...")
带内存的对话工作流
from typing import Literal
from langgraph.graph.message import add_messages # 必须导入!
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, START, END
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated,Dict
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver
# 带内存的对话工作流
# 使用 add_messages 注解,消息会自动追加
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
turn_count: int
# model = ChatOllama(
# model="qwen3.5:9b",
# base_url="http://localhost:11434",
# temperature=0.7,
# # M5 专属优化
# num_ctx=1024, # 越小越快
# num_predict=256, # 限制回答长度
# use_metal=True, # ✅ M5 必开:调用GPU/NPU
# # keep_alive="24h", # 模型常驻,不卸载
# )
model = ChatOpenAI(
base_url="http://localhost:1234/v1",
api_key="dummy-key", # 随便填
model="qwen/qwen3.5-9b",
temperature=0.2,
max_tokens=1024,
# max_tokens=64,
# ✅ 核心优化 2:关闭思考链(关键!)
# 关键 2:强制关闭思考链(这个格式 100% 生效)
# use_metal=True,
# streaming=True, # 开启流式输出
# callbacks=[StreamingStdOutCallbackHandler()], # 实时打印
)
def chat_node(state: AgentState) -> dict:
"""对话节点:处理用户消息并生成回复"""
# 添加系统提示(如果是第一轮)
messages = state["messages"]
if not any(isinstance(m, SystemMessage) for m in messages):
messages = [
SystemMessage(content="你是友好助手,请直接回答问题,**绝对不要输出思考过程,不要写推理步骤**")
] + messages
# 调用模型
response = model.invoke(messages)
# 更新轮数
turn_count = state.get("turn_count", 0) + 1
print(f" [对话轮次 {turn_count}] AI: {response.content[:50]}...")
# 返回新消息(会自动追加到 messages 列表)
return {
"messages": [response], # add_messages 会自动追加
"turn_count": turn_count
}
def should_continue(state: AgentState) -> Literal["continue", "end"]:
"""决定是否继续对话"""
# 这里简化为总是返回 end,实际应用中可以检查用户意图
return "end"
# 构建图
graph = StateGraph(AgentState)
graph.add_node("chat", chat_node)
graph.add_edge(START, "chat")
graph.add_edge("chat", END) # 简化:每次调用处理一轮
# 使用内存保存器
memory = MemorySaver()
app = graph.compile(checkpointer=memory)
# 模拟多轮对话(使用相同的 thread_id)
config = {"configurable": {"thread_id": "user_001"}}
conversations = [
"你好!我叫小明。",
"我最喜欢的编程语言是 Python。",
"你还记得我的名字吗?",
"我喜欢什么编程语言?"
]
for user_input in conversations:
print(f"\n用户: {user_input}")
result = app.invoke(
{"messages": [HumanMessage(content=user_input)]},
config=config
)
print(f"AI: {result['messages'][-1].content}")