# 核心概念
# Agent 执行循环 = 自动化的"思考-行动-观察"过程
# Agent 不是一次性调用,而是一个循环:
# 用户问题 → AI 思考 → 调用工具 → 观察结果 → 继续思考 → 最终答案
# 执行循环详解
# 执行循环详解
# ┌─────────────┐
# │ 用户提问 │
# │ HumanMessage│
# └──────┬──────┘
# ↓
# ┌─────────────┐
# │ AI 分析问题 │
# │ 需要工具? │
# └──────┬──────┘
# ↓ 是
# ┌─────────────┐
# │ AI 决定调用 │
# │ AIMessage │
# │ (tool_calls)│
# └──────┬──────┘
# ↓
# ┌─────────────┐
# │ 执行工具 │
# │ ToolMessage │
# └──────┬──────┘
# ↓
# ┌─────────────┐
# │ AI 看结果 │
# │ 生成答案 │
# │ AIMessage │
# └─────────────┘
import sys
import os
from urllib import response
from openai import chat
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from init_model import get_chat_model
from langchain_core.tools import tool
from init_model import get_chat_model
from langchain.agents import create_agent
# 导入自定义工具
from tools import weather
from tools import web_search
from tools import calculator
# 初始化数据
weather_tool = weather.get_weather
web_search_tool = web_search.web_search
calculator_tool = calculator.calculator
chat_model = get_chat_model()
# 使用工具完整过程
def demo1():
agent = create_agent(model=chat_model, tools=[calculator_tool]) # 计算工具
response = agent.invoke({"messages": [{"role": "user", "content": "25 乘以 8"}]})
# 1. 查看完整历史
for msg in response["messages"]:
print(f"{msg.__class__.__name__}: {msg.content}")
# 2. 获取最终答案
final_answer = response["messages"][-1].content
print(f"最终答案: {final_answer}")
# 3. 查看使用的工具
used_tools = []
for msg in response["messages"]:
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
used_tools.append(tc["name"])
print(f"使用的工具: {used_tools}")
# 流式输出(Streaming)
# 用于实时显示 Agent 的进度
def demo2_fixed():
agent = create_agent(model=chat_model, tools=[calculator_tool])
print("\n--- 开始流式输出 ---")
for chunk in agent.stream({"messages": [{"role": "user", "content": "25 乘以 8"}]}):
# chunk 结构可能是 {'model': {'messages': [...]}} 或 {'tools': {'messages': [...]}}
# 我们需要遍历 chunk 的值来找到 'messages' 列表
for key, value in chunk.items():
if isinstance(value, dict) and "messages" in value:
messages_list = value["messages"]
for msg in messages_list:
# 获取内容
content = getattr(msg, "content", "")
# 1. 处理 AI 的回答 (流式或最终)
if msg.type == "ai" and content:
# 使用 end="" 实现打字机效果
print(content, end="", flush=True)
# 2. 处理工具的执行结果 (可选,如果你想看中间计算过程)
elif msg.type == "tool" and content:
# 换行显示工具结果,避免和 AI 回答混在一起
print(f"\n[计算器结果]: {content}", flush=True)
# 3. 处理工具调用意图 (可选,显示模型决定调用什么)
elif (
msg.type == "ai"
and hasattr(msg, "tool_calls")
and msg.tool_calls
):
tool_name = msg.tool_calls[0].get("name", "unknown")
print(f"\n[模型决定调用工具]: {tool_name}...", flush=True)
print("\n--- 结束 ---")
# --- 开始流式输出 ---
# [模型决定调用工具]: calculator...
# [计算器结果]: 25.0 multiply 8.0 = 200.0
# 25 乘以 8 等于 200。
# --- 结束 ---
# stream vs invoke
# 方法 返回 用途
# invoke() 完整结果 等待完成后一次性获取
# stream() 生成器 实时获取中间步骤
# 多步骤执行
# Agent 可以多次调用工具:
def demo3():
agent = create_agent(model=chat_model, tools=[calculator_tool])
# 问题:先算 10 + 20,然后乘以 3
response = agent.invoke(
{"messages": [{"role": "user", "content": "先算 10 + 20,然后乘以 3"}]}
)
# Agent 可能会:
# 1. 调用 calculator(add, 10, 20) → 30
# 2. 调用 calculator(multiply, 30, 3) → 90
# 3. 返回最终答案
# 查看完整历史
for i, msg in enumerate(response["messages"], 1):
print(f"\n--- 消息 {i}: {msg.__class__.__name__} ---")
if hasattr(msg, "content"):
print(f"内容: {msg.content}")
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
print(f"工具: {tc['name']}, 参数: {tc['args']}")
# 使用 stream 查看步骤
def demo4(chunk):
agent = create_agent(model=chat_model, tools=[calculator_tool])
step = 0
input_data = {"messages": [{"role": "user", "content": "先算 10 + 20,然后乘以 3"}]}
print("--- 开始执行多步计算 ---")
for chunk in agent.stream(input_data):
step += 1
print(f"步骤 {step}:")
# 【关键修改】:messages 不在 chunk 顶层,而在 chunk['model'] 或 chunk['tools'] 里
messages_list = None
for value in chunk.values():
if isinstance(value, dict) and "messages" in value:
messages_list = value["messages"]
break
if messages_list:
latest = messages_list[-1]
print(f" 类型: {latest.__class__.__name__}")
# 打印内容
if hasattr(latest, "content") and latest.content:
print(f" 内容: {latest.content}")
elif hasattr(latest, "content"):
print(f" 内容: (空,可能是纯工具调用)")
# 打印工具调用详情
if hasattr(latest, "tool_calls") and latest.tool_calls:
for tc in latest.tool_calls:
print(f" 🛠️ 工具: {tc['name']}, 参数: {tc['args']}")
else:
print(" (未解析到消息)")
print("--- 结束 ---")
# 查看是否调用了工具
def check_agent_tool_usage():
# 1. 创建 Agent (确保 chat_model 和 calculator_tool 已定义)
agent = create_agent(model=chat_model, tools=[calculator_tool])
# 2. 执行任务 (invoke 会阻塞直到完成)
response = agent.invoke(
{"messages": [{"role": "user", "content": "先算 10 + 20,然后乘以 3"}]}
)
# 3. 检查消息列表中是否有 tool_calls
# response["messages"] 包含了整个对话历史 (用户问 -> AI思考/调工具 -> 工具返回 -> AI回答)
has_tool_calls = any(
hasattr(msg, "tool_calls") and msg.tool_calls for msg in response["messages"]
)
if has_tool_calls:
print("✅ Agent 使用了工具")
# 进阶:打印具体用了什么工具
for msg in response["messages"]:
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
print(f" - 调用了: {tc['name']}, 参数: {tc['args']}")
else:
print("❌ Agent 直接回答 (未使用工具)")
# . Agent 可以调用多少次工具?
# 答:默认没有限制,直到得到最终答案
# 但可能会:
# 超时
# 达到 token 限制
# 模型决定停止
# 3. 如何限制工具调用次数?
# LangChain 1.0 的 create_agent 默认使用 LangGraph,可以通过配置限制:
def create_agent_with_tool_limit(model, tools, max_tool_calls=3):
"""
创建一个 Agent,限制工具调用次数
:param model: 语言模型
:param tools: 可用工具列表
:param max_tool_calls: 最大允许调用次数
:return: 配置好的 Agent
"""
return create_agent(
model=model,
tools=tools,
# 配置 LangGraph 限制
langgraph_config={
"max_iterations": max_tool_calls,
},
)
if __name__ == "__main__":
check_agent_tool_usage()