Code - LangGraph 基础

LangGraph

# 什么是 LangGraph?
# LangGraph 是一个用于构建状态化、多步骤 AI 应用的框架。它使用图(Graph) 的概念来组织工作流:
# 节点(Nodes):图中的处理单元,可以是 LLM 调用、工具执行或自定义函数
# 边(Edges):连接节点的路径,定义执行顺序
# 状态(State):在节点之间传递的数据结构

# LangGraph vs LangChain
# 特性    LangChain    LangGraph
# 抽象级别    高级    低级

# 适用场景    快速构建标准 Agent    复杂自定义工作流
# 控制粒度    通过中间件    完全控制每个节点
# 状态管理    自动    手动但灵活

# Hello World Graph

# 1. 定义状态 - 使用 TypedDict
from typing import TypedDict, Annotated,Dict
from langgraph.graph import StateGraph


class AgentState(TypedDict):
    message: str

def greeting_node(state: AgentState)-> AgentState:
    """ 一个向状态添加消息的简单节点 """
    state['message'] = "Hello" + state["message"]
    return state

graph = StateGraph(AgentState)

graph.add_node("greeter",greeting_node)
graph.set_entry_point("greeter")
graph.set_finish_point("greeter")
app = graph.compile()

打印图流程

from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))

多个节点并添加边

#多个节点并添加边
from typing import TypedDict, Annotated,Dict
from langgraph.graph import StateGraph

class AgentState(TypedDict):
    name :str
    age : str
    final:str

def first_node(state: AgentState)-> AgentState:
    """这是第一个节点"""
    state["final"] = f"你好 {state['name']}"

    return state

def second_node(state: AgentState)-> AgentState:
    """这是第二个节点"""
    state["final"] =  state["final"] + f"你已经 {state['age']}了"
    return state

graph = StateGraph(AgentState)
graph.add_node("first_node",first_node)
graph.add_node("second_node",second_node)

graph.set_entry_point("first_node")
graph.add_edge("first_node","second_node")
graph.set_finish_point("second_node")
app = graph.compile()

from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))

result = app.invoke({"name":"zhangsan","age":"18",})
print(result)

加模型顺序执行

#加模型顺序执行
from langgraph.graph import StateGraph, START, END
from langchain_ollama import ChatOllama
from typing import TypedDict, Annotated,Dict
# ==========================================
# 1. 初始化模型
# ==========================================
model = ChatOllama(
    model="qwen3.5:2b",
    base_url="http://localhost:11434",
    temperature=0.7,
)

class SimpleState(TypedDict):
    input_text:str
    processed_text :str
    llm_response :str
    final_output:str

# 定义节点函数
def preprocess(state: SimpleState) -> dict:
    """预处理节点:清理和格式化输入"""
    text = state["input_text"].strip().lower()
    print(f"  [预处理] 输入: '{state['input_text']}' -> '{text}'")
    return {"processed_text": text}

def call_llm(state: SimpleState) -> dict:
    """LLM 节点:调用语言模型"""
    messages = [
        {"role": "system", "content": "你是一个友好的助手,请简洁回答问题。"},
        {"role": "user", "content": state["processed_text"]}
    ]

    response = model.invoke(messages)
    print(f"  [LLM] 响应: {response.content[:50]}...")
    return {"llm_response": response.content}

def postprocess(state: SimpleState) -> dict:
    """后处理节点:格式化输出"""
    final = f"✨ AI 回复:{state['llm_response']}"
    print("  [后处理] 完成格式化")
    return {"final_output": final}


# 构建图
graph = StateGraph(AgentState)

# 添加节点
graph.add_node("preprocess", preprocess)
graph.add_node("call_llm", call_llm)
graph.add_node("postprocess", postprocess)
    # 添加边(定义执行顺序)
graph.add_edge(START, "preprocess")
graph.add_edge("preprocess", "call_llm")
graph.add_edge("call_llm", "postprocess")
graph.add_edge("postprocess", END)
# 编译图
app = graph.compile()


from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))
result = app.invoke({"input_text": "  介绍一下你自己  "})
print(f"\n最终输出:\n{result['final_output']}")

条件分支工作流

from typing import Literal
#条件分支工作流
#加模型顺序执行
from langgraph.graph import StateGraph, START, END
from langchain_ollama import ChatOllama
from typing import TypedDict, Annotated,Dict

class AgentState(TypedDict):
    query: str
    query_type: str
    response: str


def classify_query(state: AgentState) ->dict:
    """分类节点:判断查询类型"""
    query = state["query"].lower()
    if any(word in query for word in ["天气", "温度", "下雨"]):
        query_type = "weather"
    elif any(word in query for word in ["计算", "加", "减", "乘", "除", "等于"]):
        query_type = "math"
    else:
        query_type = "general"

    print(f"  [分类] 查询类型: {query_type}")
    return {"query_type": query_type}


def handle_weather(state: AgentState) -> dict:
    """处理天气查询"""
    print("  [天气处理] 执行天气查询逻辑...")
    # 实际应用中这里会调用天气 API
    response = "🌤️ 今天天气晴朗,温度 25°C,适合外出!"
    return {"response": response}

def handle_math(state: AgentState) -> dict:
    """处理数学计算"""
    print("  [数学处理] 执行计算逻辑...")

    content = "计算器损坏,计算结果统一为 0。"

    return {"response": f"🔢 {content}"}

def handle_general(state: AgentState) -> dict:
    """处理一般查询"""
    print("  [通用处理] 执行通用 LLM 调用...")
    messages = [
        {"role": "system", "content": "你是一个友好的助手,请简洁回答问题。"},
        {"role": "user", "content": state["query"]}
    ]

    result = model.invoke(messages)
    return {"response": f"💡 {result.content}"}

def route_query(state: AgentState) -> Literal["weather", "math", "general"]:
    """路由函数:返回下一个节点名称"""
    return state["query_type"]


# 构建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("classify", classify_query)
graph.add_node("weather", handle_weather)
graph.add_node("math", handle_math)
graph.add_node("general", handle_general)

# 添加边
graph.add_edge(START, "classify")

# 添加条件边
graph.add_conditional_edges(
    "classify",  # 从哪个节点出发
    route_query,  # 路由函数
    {  # 路由映射
        "weather": "weather",
        "math": "math",
        "general": "general"
    }
)
# 所有处理节点都到 END
graph.add_edge("weather", END)
graph.add_edge("math", END)
graph.add_edge("general", END)

# 编译
app = graph.compile()
from IPython.display import Image ,display
display(Image(app.get_graph().draw_mermaid_png()))
# 测试不同类型的查询
test_queries = [
    "今天北京的天气怎么样?",
    "计算 123 加 456 等于多少?",
    "Python 是什么编程语言?"
]

for query in test_queries:
    print(f"\n查询: {query}")
    result = app.invoke({"query": query})
    print(f"响应: {result['response'][:100]}...")

带内存的对话工作流

from typing import Literal
from langgraph.graph.message import add_messages  # 必须导入!
from langchain_core.messages import BaseMessage
from langgraph.graph import StateGraph, START, END
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated,Dict
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver

# 带内存的对话工作流
# 使用 add_messages 注解,消息会自动追加
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    turn_count: int


# model = ChatOllama(
#     model="qwen3.5:9b",
#     base_url="http://localhost:11434",
#     temperature=0.7,
#     # M5 专属优化
#     num_ctx=1024,     # 越小越快
#     num_predict=256,   # 限制回答长度
#     use_metal=True,    # ✅ M5 必开:调用GPU/NPU
#     # keep_alive="24h",  # 模型常驻,不卸载
# )

model = ChatOpenAI(
    base_url="http://localhost:1234/v1",
    api_key="dummy-key",  # 随便填
    model="qwen/qwen3.5-9b",
    temperature=0.2,
    max_tokens=1024,
        # max_tokens=64,

    # ✅ 核心优化 2:关闭思考链(关键!)
    # 关键 2:强制关闭思考链(这个格式 100% 生效)
    # use_metal=True,
    # streaming=True,  # 开启流式输出
    # callbacks=[StreamingStdOutCallbackHandler()],  # 实时打印
)


def chat_node(state: AgentState) -> dict:
    """对话节点:处理用户消息并生成回复"""
    # 添加系统提示(如果是第一轮)
    messages = state["messages"]
    if not any(isinstance(m, SystemMessage) for m in messages):
        messages = [
            SystemMessage(content="你是友好助手,请直接回答问题,**绝对不要输出思考过程,不要写推理步骤**")
        ] + messages

    # 调用模型
    response = model.invoke(messages)

    # 更新轮数
    turn_count = state.get("turn_count", 0) + 1
    print(f"  [对话轮次 {turn_count}] AI: {response.content[:50]}...")

    # 返回新消息(会自动追加到 messages 列表)
    return {
        "messages": [response],  # add_messages 会自动追加
        "turn_count": turn_count
    }
def should_continue(state: AgentState) -> Literal["continue", "end"]:
    """决定是否继续对话"""
    # 这里简化为总是返回 end,实际应用中可以检查用户意图
    return "end"

# 构建图
graph = StateGraph(AgentState)
graph.add_node("chat", chat_node)

graph.add_edge(START, "chat")
graph.add_edge("chat", END)  # 简化:每次调用处理一轮

# 使用内存保存器
memory = MemorySaver()
app = graph.compile(checkpointer=memory)

# 模拟多轮对话(使用相同的 thread_id)
config = {"configurable": {"thread_id": "user_001"}}

conversations = [
    "你好!我叫小明。",
    "我最喜欢的编程语言是 Python。",
    "你还记得我的名字吗?",
    "我喜欢什么编程语言?"
]

for user_input in conversations:
    print(f"\n用户: {user_input}")
    result = app.invoke(
        {"messages": [HumanMessage(content=user_input)]},
        config=config
    )
    print(f"AI: {result['messages'][-1].content}")