多 Agent 状态同步、Checkpoint 记忆与重试机制
一、多 Agent 状态同步
1.1 多个 Agent 怎么共享数据?
TravelAgent-5 有 5 个节点(supervisor、route、hotel、food、info), 其中 4 个 Agent 可能并行执行。它们需要:
- 读取用户输入(目的地、天数、预算)
- 写入各自的结果(路线、酒店、美食、文化)
- 互不干扰
1.2 答案:AgentState + add_messages reducer
所有 Agent 共享同一个 AgentState 结构:
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 消息列表(特殊处理)
user_input: str # 所有 Agent 可读
destination: str # 所有 Agent 可读
days: int # 所有 Agent 可读
route_result: str # 只有 route_agent 写入
hotel_result: str # 只有 hotel_agent 写入
food_result: str # 只有 food_agent 写入
info_result: str # 只有 info_agent 写入
# ...
关键设计:
普通字段(如 route_result):每个 Agent 只写自己的字段,互不干扰。
并行执行时,LangGraph 给每个分支一份 state 的副本,
各分支独立修改,最后在 fan-in(aggregator)时合并。
messages 字段:使用 Annotated[list, add_messages] reducer。
add_messages 是 LangGraph 内置的消息归约器,规则是:
- 新消息追加到列表末尾,而不是覆盖
- 通过消息 ID 去重(避免重复追加)
- 多个并行分支都往 messages 写内容时,最终结果是所有内容拼接
1.3 并行执行的数据流
supervisor 返回 {"destination": "成都", "agents_to_call": ["route","hotel","food"]}
│
┌─────────────────┼─────────────────┐
│ Send("route") │ Send("hotel") │ Send("food")
▼ ▼ ▼
state 副本 1 state 副本 2 state 副本 3
route_agent 读: hotel_agent 读: food_agent 读:
destination destination destination
days budget days
route_agent 写: hotel_agent 写: food_agent 写:
route_result hotel_result food_result
│ │ │
└─────────────────┴──────────────────┘
│
aggregator
合并所有分支的 state:
- route_result ← 来自分支1
- hotel_result ← 来自分支2
- food_result ← 来自分支3
- messages ← 三个分支的 messages 拼接(add_messages)
1.4 源码对应
写入方(各 Agent 节点):
# route_agent_node 返回
return {"route_result": json.dumps(result, ensure_ascii=False)}
# hotel_agent_node 返回
return {"hotel_result": json.dumps(...)}
# food_agent_node 返回
return {"food_result": json.dumps(...)}
# info_agent_node 返回
return {"info_result": response.content}
# supervisor_node 和 aggregator_node 还会写入 messages
return {"messages": [AIMessage(content=...)]}
读取方(aggregator_node):
route = json.loads(state["route_result"]) # 读路线结果
hotels = json.loads(state["hotel_result"]) # 读酒店结果
foods = json.loads(state["food_result"]) # 读美食结果
info_text = state.get("info_result", "") # 读文化结果
每个字段只有一个 Agent 写入,所以不存在写冲突。
1.5 add_messages 的消息合并原理
当多个并行分支都返回 messages 时,LangGraph 的合并流程:
分支1 (route) 返回: {"messages": []} ← route 不写 messages
分支2 (hotel) 返回: {"messages": []} ← hotel 不写 messages
分支3 (food) 返回: {"messages": []} ← food 不写 messages
分支4 (info) 返回: {"messages": []} ← info 不写 messages
合并结果: messages = 原有messages + [] + [] + [] + [] = 原有messages
只有 supervisor 和 aggregator 写入 messages,所以并行分支间没有消息冲突。
二、Checkpoint 工作机制
2.1 Checkpoint 是什么?
Checkpoint 是 LangGraph 的状态持久化机制。每次图的一个节点执行完, LangGraph 会把当前的完整 state 序列化后存入 checkpointer(这里是 SQLite)。
下次同一个 thread_id 的请求进来时,LangGraph 从 checkpointer 恢复 state,
就像时间倒流到上次执行完的位置。
2.2 存储结构
checkpoints.db (SQLite)
├── checkpoints 表
│ ├── thread_id: "sess_1715xxx" ← 会话标识
│ ├── checkpoint_id: "uuid" ← 快照唯一ID
│ ├── parent_id: "uuid" ← 上一个快照的ID(形成链表)
│ ├── checkpoint: BLOB ← 序列化的完整 state
│ └── metadata: JSON ← 节点名、时间戳等
每个 thread_id 形成一条 checkpoint 链:
thread_id="sess_abc"
checkpoint_1 (supervisor 执行完)
└── checkpoint_2 (route 执行完)
└── checkpoint_3 (aggregator 执行完) ← 最新状态
2.3 源码中的初始化
# 1. 创建 SQLite 连接
_checkpoint_conn = sqlite3.connect(CHECKPOINT_DB, check_same_thread=False)
# 2. 创建 SqliteSaver(LangGraph 的 Checkpoint 存储后端)
checkpointer = SqliteSaver(conn=_checkpoint_conn)
# 3. 创建数据库表(如果不存在)
checkpointer.setup()
# 4. 编译图时绑定 checkpointer
graph = build_graph(checkpointer=checkpointer)
2.4 请求时的 Checkpoint 流程
# /chat 端点的 run() 函数
config = {"configurable": {"thread_id": sid}}
final = graph.invoke(
{"user_input": user_message, "messages": [], ...},
config=config,
)
graph.invoke() 内部的执行流程:
1. 用 thread_id="sess_abc" 查找 checkpoint
└── 找到 checkpoint_3(上次 aggregator 执行完的状态)
2. 恢复 state:
{
"messages": [HumanMessage("成都"), AIMessage("规划了成都3天"), ...],
"destination": "成都",
"route_result": "{...}",
...
}
3. 合并用户传入的 dict(只覆盖非空字段,messages 用 add_messages 追加)
4. 执行 supervisor 节点
└── 保存 checkpoint_4
5. 根据 supervisor 的返回,执行 route/hotel/food/info(并行)
└── 每个执行完保存一个 checkpoint
6. 执行 aggregator
└── 保存最终 checkpoint
2.5 thread_id 与会话一致性
config = {"configurable": {"thread_id": sid}}
# ↑
# 前端传来的 session_id
# 同一会话的所有请求共享此 ID
- 用户 A 的 session_id = "sess_111" → thread_id = "sess_111"
- 用户 B 的 session_id = "sess_222" → thread_id = "sess_222"
- 同一用户不同会话 → 不同 thread_id → 不同 checkpoint 链
- 同一用户同一会话 → 相同 thread_id → 共享 checkpoint 历史
三、上下文窗口管理
3.1 问题:LLM 的 token 限制
DeepSeek-chat 的上下文窗口约 8192 token。 一次旅行规划请求中,发给 LLM 的内容包括:
系统提示词 ≈ 200 token
对话历史 ≈ ? token(可能很长)
用户当前消息 ≈ 50 token
LLM 的回复 ≈ 1000-2000 token
─────────────────────────────────
总计不能超过 8192
如果对话历史太长(比如用户聊了 20 轮),总 token 会超出限制。
3.2 当前:trim_messages 截断
trimmer = trim_messages(
max_tokens=4000, # 历史消息最多占 4000 token
strategy="last", # 保留最新的,丢弃最旧的
token_counter=supervisor_agent, # 用 LLM 的 tokenizer 计数
include_system=True, # 保留 SystemMessage
start_on="human", # 裁剪后第一条必须是 HumanMessage
)
trimmed_history = trimmer.invoke(messages)
截断策略:
原始 messages:
[System("你是..."), Human("成都"), AIMessage("好的"), Human("酒店便宜点"),
AIMessage("推荐了3家"), Human("有什么好吃的"), AIMessage("推荐了火锅")]
trim_messages(max_tokens=4000) 后:
[System("你是..."), Human("有什么好吃的"), AIMessage("推荐了火锅")]
← 只保留最近的几条,总 token ≤ 4000
。
四、LLM重试
4.1 为什么需要重试?
LLM 调用可能因为以下原因失败:
- 网络波动(连接超时)
- API 限流(429 Too Many Requests)
- 服务端临时故障(500 Internal Server Error)
- 响应格式异常(JSON 解析失败)
如果不重试,一次偶发失败就会导致整个请求失败。
4.2 实现:指数退避重试
MAX_RETRIES = 3 # 最大重试 3 次
RETRY_BASE_DELAY = 1 # 基础等待 1 秒
def invoke_with_retry(agent, messages, max_retries=MAX_RETRIES, base_delay=RETRY_BASE_DELAY):
last_error = None
for attempt in range(max_retries + 1): # 首次 + 3次重试 = 最多4次尝试
try:
return agent.invoke(messages)
except Exception as e:
last_error = e
if attempt < max_retries:
wait_time = base_delay * (2 ** attempt) # 指数退避
time.sleep(wait_time)
raise last_error
执行时序:
时间轴:
0s 尝试1 → 失败(网络超时)
1s 重试1 → 失败(API限流) wait = 1 * 2^0 = 1秒
3s 重试2 → 失败(服务端错误) wait = 1 * 2^1 = 2秒
7s 重试3 → 成功! wait = 1 * 2^2 = 4秒(如果需要)
4.3 各 Agent 的重试调用
# supervisor_node
resp = invoke_with_retry(supervisor_agent, [SystemMessage(...)] + history + [HumanMessage(...)])
# route_agent_node
resp = invoke_with_retry(route_agent, [SystemMessage(ROUTE_SYSTEM_PROMPT), HumanMessage(...)])
# hotel_agent_node
compare_resp = invoke_with_retry(hotel_agent, [SystemMessage(HOTEL_SYSTEM_PROMPT), HumanMessage(...)])
# food_agent_node
resp = invoke_with_retry(food_agent, [SystemMessage(FOOD_SYSTEM_PROMPT), HumanMessage(...)])
# info_agent_node
response = invoke_with_retry(info_agent, [SystemMessage(INFO_SYSTEM_PROMPT), HumanMessage(...)])
所有 LLM 调用都经过重试保护,最多尝试 4 次(1次原始 + 3次重试)。
4.4 为什么不用无限重试?
无限重试的风险:
1. API 限流时,疯狂重试会加重限流(恶性循环)
2. 用户等待时间不可控(可能等几分钟)
3. 后台线程被长期占用,影响其他请求
有限重试的保障:
1. 最多 3 次重试,总等待时间最多 1+2+4=7 秒
2. 指数退避给 API 恢复的时间
3. 超过重试次数后,走兜底逻辑(如返回默认值)
4.5 各 Agent 的兜底策略
重试全部失败后,各 Agent 的兜底行为:
| Agent | 兜底行为 | 源码位置 |
|---|---|---|
| supervisor | 默认走追问流程 "ASK: 你想去哪玩呢?" |
supervisor_node 的 except |
| route | 返回空行程 {"days": []} |
route_agent_node 的 except |
| hotel | 返回空对比 comparison = "" |
hotel_agent_node 的 except |
| food | 返回空美食 {"days": []} |
food_agent_node 的 except |
| info | 无兜底,异常会向上抛出 | info_agent_node 无 try/except |
info_agent_node 没有 try/except 保护。如果重试全部失败,
异常会传播到 graph.invoke() 的外层 try/except,
最终通过 SSE 告知用户"处理出错"。
4.6 当前架构的循环防护
本项目的图是有向无环图(DAG),不存在循环:
supervisor → route/hotel/food/info → aggregator → END
supervisor → end → END
- 没有节点指向 supervisor(不会循环回起点)
- 没有节点指向自己(不会自循环)
- 所有路径最终到达 END
所以不需要"最大循环次数"限制。图的结构本身保证了有限执行。
五、完整数据流
一次旅行规划请求的完整生命周期:
用户: "帮我规划成都3天旅行"
│
├─ 1. Go 网关转发到 Python 后端
│ {"message": "...", "user_id": "u1", "session_id": "sess_abc"}
│
├─ 2. /chat 端点创建 config = {"configurable": {"thread_id": "sess_abc"}}
│
├─ 3. graph.invoke() 从 checkpoint 加载 sess_abc 的历史 state
│ └── 首次访问,无历史,state 为空
│
├─ 4. supervisor_node 执行
│ ├─ state["messages"] = [](无历史)
│ ├─ trim_messages([]) → [](无需截断)
│ ├─ invoke_with_retry(supervisor_agent, ...) → "PLAN: 成都|3|深圳||5000|route,hotel,food"
│ ├─ 解析出 destination="成都", days=3, agents_to_call=[route,hotel,food]
│ ├─ 返回 {"destination":"成都", "messages":[AIMessage("好的,成都3天...")]}
│ └─ checkpoint 自动保存
│
├─ 5. route_decision → Send("route"), Send("hotel"), Send("food") → 三个并行分支
│
├─ 6. route_agent_node(并行)
│ ├─ 读 state: destination="成都", days=3
│ ├─ 调飞猪 API 搜索景点
│ ├─ invoke_with_retry(route_agent, ...) → JSON 行程
│ ├─ 返回 {"route_result": "{...}"}
│ └─ checkpoint 自动保存
│
├─ 7. hotel_agent_node(并行)
│ ├─ 读 state: destination="成都", budget="5000"
│ ├─ 调飞猪 API 搜索酒店
│ ├─ invoke_with_retry(hotel_agent, ...) → 对比分析
│ ├─ 返回 {"hotel_result": "{...}"}
│ └─ checkpoint 自动保存
│
├─ 8. food_agent_node(并行)
│ ├─ 读 state: destination="成都", days=3
│ ├─ invoke_with_retry(food_agent, ...) → JSON 餐厅推荐
│ ├─ 返回 {"food_result": "{...}"}
│ └─ checkpoint 自动保存
│
├─ 9. aggregator_node 执行(fan-in,合并所有分支结果)
│ ├─ 读 state: route_result, hotel_result, food_result
│ ├─ 合并美食到每日行程
│ ├─ 生成行程卡片 + 文字摘要
│ ├─ 返回 {"final_result":"done", "messages":[AIMessage("为用户规划了成都3天行程")]}
│ └─ checkpoint 自动保存最终状态
│
├─ 10. 图执行完毕,返回 final state
│
├─ 11. update_user_profile(uid, destination="成都", budget="5000")
│ └─ 写入 SQLite user_profiles 表(跨 session 持久记忆)
│
└─ 12. SSE 推送完成,前端渲染行程卡片
下次同一 session 的请求:
用户: "酒店换便宜点的"
│
├─ graph.invoke() 从 checkpoint 加载 sess_abc 的历史
│ └── 恢复 state: messages=[...之前的消息...], destination="成都", ...
│
├─ supervisor_node
│ ├─ state["messages"] 包含之前的对话
│ ├─ trim_messages 裁剪到 4000 token
│ ├─ LLM 看到上下文:"用户要成都旅行,之前推荐了酒店..."
│ └─ 输出 "PLAN: 成都|0|||便宜|hotel" ← 只调 hotel agent
│
├─ 只有 hotel_agent 执行(其他不调用)
│
└─ aggregator 汇总
这就是 checkpoint 的价值:跨请求的上下文连续性。
导航
← 上一篇:LangGraph Checkpoint:一次架构简化实践 → 下一篇:Reducer学习:从默认行为到自定义合并