多 Agent 状态同步、Checkpoint 记忆与重试机制

一、多 Agent 状态同步

1.1 多个 Agent 怎么共享数据?

TravelAgent-5 有 5 个节点(supervisor、route、hotel、food、info), 其中 4 个 Agent 可能并行执行。它们需要:

  • 读取用户输入(目的地、天数、预算)
  • 写入各自的结果(路线、酒店、美食、文化)
  • 互不干扰

1.2 答案:AgentState + add_messages reducer

所有 Agent 共享同一个 AgentState 结构:

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 消息列表(特殊处理)
    user_input: str        # 所有 Agent 可读
    destination: str       # 所有 Agent 可读
    days: int              # 所有 Agent 可读
    route_result: str      # 只有 route_agent 写入
    hotel_result: str      # 只有 hotel_agent 写入
    food_result: str       # 只有 food_agent 写入
    info_result: str       # 只有 info_agent 写入
    # ...

关键设计:

普通字段(如 route_result):每个 Agent 只写自己的字段,互不干扰。 并行执行时,LangGraph 给每个分支一份 state 的副本, 各分支独立修改,最后在 fan-in(aggregator)时合并。

messages 字段:使用 Annotated[list, add_messages] reducer。 add_messages 是 LangGraph 内置的消息归约器,规则是:

  • 新消息追加到列表末尾,而不是覆盖
  • 通过消息 ID 去重(避免重复追加)
  • 多个并行分支都往 messages 写内容时,最终结果是所有内容拼接

1.3 并行执行的数据流

supervisor 返回 {"destination": "成都", "agents_to_call": ["route","hotel","food"]}
                              │
            ┌─────────────────┼─────────────────┐
            │ Send("route")   │ Send("hotel")    │ Send("food")
            ▼                 ▼                  ▼
     state 副本 1        state 副本 2        state 副本 3
     route_agent 读:      hotel_agent 读:     food_agent 读:
       destination          destination         destination
       days                 budget              days
     route_agent 写:      hotel_agent 写:     food_agent 写:
       route_result         hotel_result        food_result
            │                 │                  │
            └─────────────────┴──────────────────┘
                              │
                         aggregator
                    合并所有分支的 state:
                    - route_result ← 来自分支1
                    - hotel_result ← 来自分支2
                    - food_result  ← 来自分支3
                    - messages     ← 三个分支的 messages 拼接(add_messages)

1.4 源码对应

写入方(各 Agent 节点):

# route_agent_node 返回
return {"route_result": json.dumps(result, ensure_ascii=False)}

# hotel_agent_node 返回
return {"hotel_result": json.dumps(...)}

# food_agent_node 返回
return {"food_result": json.dumps(...)}

# info_agent_node 返回
return {"info_result": response.content}

# supervisor_node 和 aggregator_node 还会写入 messages
return {"messages": [AIMessage(content=...)]}

读取方(aggregator_node):

route = json.loads(state["route_result"])    # 读路线结果
hotels = json.loads(state["hotel_result"])   # 读酒店结果
foods = json.loads(state["food_result"])     # 读美食结果
info_text = state.get("info_result", "")     # 读文化结果

每个字段只有一个 Agent 写入,所以不存在写冲突。

1.5 add_messages 的消息合并原理

当多个并行分支都返回 messages 时,LangGraph 的合并流程:

分支1 (route) 返回: {"messages": []}           ← route 不写 messages
分支2 (hotel) 返回: {"messages": []}           ← hotel 不写 messages
分支3 (food)  返回: {"messages": []}           ← food 不写 messages
分支4 (info)  返回: {"messages": []}           ← info 不写 messages

合并结果: messages = 原有messages + [] + [] + [] + [] = 原有messages

只有 supervisor 和 aggregator 写入 messages,所以并行分支间没有消息冲突。


二、Checkpoint 工作机制

2.1 Checkpoint 是什么?

Checkpoint 是 LangGraph 的状态持久化机制。每次图的一个节点执行完, LangGraph 会把当前的完整 state 序列化后存入 checkpointer(这里是 SQLite)。

下次同一个 thread_id 的请求进来时,LangGraph 从 checkpointer 恢复 state, 就像时间倒流到上次执行完的位置。

2.2 存储结构

checkpoints.db (SQLite)
├── checkpoints 表
│   ├── thread_id: "sess_1715xxx"     ← 会话标识
│   ├── checkpoint_id: "uuid"         ← 快照唯一ID
│   ├── parent_id: "uuid"             ← 上一个快照的ID(形成链表)
│   ├── checkpoint: BLOB              ← 序列化的完整 state
│   └── metadata: JSON                ← 节点名、时间戳等

每个 thread_id 形成一条 checkpoint 链:

thread_id="sess_abc"
  checkpoint_1 (supervisor 执行完)
    └── checkpoint_2 (route 执行完)
        └── checkpoint_3 (aggregator 执行完)  ← 最新状态

2.3 源码中的初始化

# 1. 创建 SQLite 连接
_checkpoint_conn = sqlite3.connect(CHECKPOINT_DB, check_same_thread=False)

# 2. 创建 SqliteSaver(LangGraph 的 Checkpoint 存储后端)
checkpointer = SqliteSaver(conn=_checkpoint_conn)

# 3. 创建数据库表(如果不存在)
checkpointer.setup()

# 4. 编译图时绑定 checkpointer
graph = build_graph(checkpointer=checkpointer)

2.4 请求时的 Checkpoint 流程

# /chat 端点的 run() 函数
config = {"configurable": {"thread_id": sid}}
final = graph.invoke(
    {"user_input": user_message, "messages": [], ...},
    config=config,
)

graph.invoke() 内部的执行流程:

1. 用 thread_id="sess_abc" 查找 checkpoint
   └── 找到 checkpoint_3(上次 aggregator 执行完的状态)

2. 恢复 state:
   {
     "messages": [HumanMessage("成都"), AIMessage("规划了成都3天"), ...],
     "destination": "成都",
     "route_result": "{...}",
     ...
   }

3. 合并用户传入的 dict(只覆盖非空字段,messages 用 add_messages 追加)

4. 执行 supervisor 节点
   └── 保存 checkpoint_4

5. 根据 supervisor 的返回,执行 route/hotel/food/info(并行)
   └── 每个执行完保存一个 checkpoint

6. 执行 aggregator
   └── 保存最终 checkpoint

2.5 thread_id 与会话一致性

config = {"configurable": {"thread_id": sid}}
#                                ↑
#                          前端传来的 session_id
#                          同一会话的所有请求共享此 ID
  • 用户 A 的 session_id = "sess_111" → thread_id = "sess_111"
  • 用户 B 的 session_id = "sess_222" → thread_id = "sess_222"
  • 同一用户不同会话 → 不同 thread_id → 不同 checkpoint 链
  • 同一用户同一会话 → 相同 thread_id → 共享 checkpoint 历史

三、上下文窗口管理

3.1 问题:LLM 的 token 限制

DeepSeek-chat 的上下文窗口约 8192 token。 一次旅行规划请求中,发给 LLM 的内容包括:

系统提示词        ≈ 200 token
对话历史          ≈ ? token(可能很长)
用户当前消息      ≈ 50 token
LLM 的回复       ≈ 1000-2000 token
─────────────────────────────────
总计不能超过 8192

如果对话历史太长(比如用户聊了 20 轮),总 token 会超出限制。

3.2 当前:trim_messages 截断

trimmer = trim_messages(
    max_tokens=4000,          # 历史消息最多占 4000 token
    strategy="last",          # 保留最新的,丢弃最旧的
    token_counter=supervisor_agent,  # 用 LLM 的 tokenizer 计数
    include_system=True,      # 保留 SystemMessage
    start_on="human",         # 裁剪后第一条必须是 HumanMessage
)
trimmed_history = trimmer.invoke(messages)

截断策略:

原始 messages:
  [System("你是..."), Human("成都"), AIMessage("好的"), Human("酒店便宜点"),
   AIMessage("推荐了3家"), Human("有什么好吃的"), AIMessage("推荐了火锅")]

trim_messages(max_tokens=4000) 后:
  [System("你是..."), Human("有什么好吃的"), AIMessage("推荐了火锅")]
  ← 只保留最近的几条,总 token ≤ 4000


四、LLM重试

4.1 为什么需要重试?

LLM 调用可能因为以下原因失败:

  • 网络波动(连接超时)
  • API 限流(429 Too Many Requests)
  • 服务端临时故障(500 Internal Server Error)
  • 响应格式异常(JSON 解析失败)

如果不重试,一次偶发失败就会导致整个请求失败。

4.2 实现:指数退避重试

MAX_RETRIES = 3       # 最大重试 3 次
RETRY_BASE_DELAY = 1  # 基础等待 1 秒

def invoke_with_retry(agent, messages, max_retries=MAX_RETRIES, base_delay=RETRY_BASE_DELAY):
    last_error = None
    for attempt in range(max_retries + 1):  # 首次 + 3次重试 = 最多4次尝试
        try:
            return agent.invoke(messages)
        except Exception as e:
            last_error = e
            if attempt < max_retries:
                wait_time = base_delay * (2 ** attempt)  # 指数退避
                time.sleep(wait_time)
    raise last_error

执行时序:

时间轴:
0s    尝试1 → 失败(网络超时)
1s    重试1 → 失败(API限流)     wait = 1 * 2^0 = 1秒
3s    重试2 → 失败(服务端错误)   wait = 1 * 2^1 = 2秒
7s    重试3 → 成功!              wait = 1 * 2^2 = 4秒(如果需要)

4.3 各 Agent 的重试调用

# supervisor_node
resp = invoke_with_retry(supervisor_agent, [SystemMessage(...)] + history + [HumanMessage(...)])

# route_agent_node
resp = invoke_with_retry(route_agent, [SystemMessage(ROUTE_SYSTEM_PROMPT), HumanMessage(...)])

# hotel_agent_node
compare_resp = invoke_with_retry(hotel_agent, [SystemMessage(HOTEL_SYSTEM_PROMPT), HumanMessage(...)])

# food_agent_node
resp = invoke_with_retry(food_agent, [SystemMessage(FOOD_SYSTEM_PROMPT), HumanMessage(...)])

# info_agent_node
response = invoke_with_retry(info_agent, [SystemMessage(INFO_SYSTEM_PROMPT), HumanMessage(...)])

所有 LLM 调用都经过重试保护,最多尝试 4 次(1次原始 + 3次重试)。

4.4 为什么不用无限重试?

无限重试的风险:
1. API 限流时,疯狂重试会加重限流(恶性循环)
2. 用户等待时间不可控(可能等几分钟)
3. 后台线程被长期占用,影响其他请求

有限重试的保障:
1. 最多 3 次重试,总等待时间最多 1+2+4=7 秒
2. 指数退避给 API 恢复的时间
3. 超过重试次数后,走兜底逻辑(如返回默认值)

4.5 各 Agent 的兜底策略

重试全部失败后,各 Agent 的兜底行为:

Agent 兜底行为 源码位置
supervisor 默认走追问流程 "ASK: 你想去哪玩呢?" supervisor_node 的 except
route 返回空行程 {"days": []} route_agent_node 的 except
hotel 返回空对比 comparison = "" hotel_agent_node 的 except
food 返回空美食 {"days": []} food_agent_node 的 except
info 无兜底,异常会向上抛出 info_agent_node 无 try/except

info_agent_node 没有 try/except 保护。如果重试全部失败, 异常会传播到 graph.invoke() 的外层 try/except, 最终通过 SSE 告知用户"处理出错"。

4.6 当前架构的循环防护

本项目的图是有向无环图(DAG),不存在循环:

supervisor → route/hotel/food/info → aggregator → END
supervisor → end → END
  • 没有节点指向 supervisor(不会循环回起点)
  • 没有节点指向自己(不会自循环)
  • 所有路径最终到达 END

所以不需要"最大循环次数"限制。图的结构本身保证了有限执行。


五、完整数据流

一次旅行规划请求的完整生命周期:

用户: "帮我规划成都3天旅行"
│
├─ 1. Go 网关转发到 Python 后端
│     {"message": "...", "user_id": "u1", "session_id": "sess_abc"}
│
├─ 2. /chat 端点创建 config = {"configurable": {"thread_id": "sess_abc"}}
│
├─ 3. graph.invoke() 从 checkpoint 加载 sess_abc 的历史 state
│     └── 首次访问,无历史,state 为空
│
├─ 4. supervisor_node 执行
│     ├─ state["messages"] = [](无历史)
│     ├─ trim_messages([]) → [](无需截断)
│     ├─ invoke_with_retry(supervisor_agent, ...) → "PLAN: 成都|3|深圳||5000|route,hotel,food"
│     ├─ 解析出 destination="成都", days=3, agents_to_call=[route,hotel,food]
│     ├─ 返回 {"destination":"成都", "messages":[AIMessage("好的,成都3天...")]}
│     └─ checkpoint 自动保存
│
├─ 5. route_decision → Send("route"), Send("hotel"), Send("food") → 三个并行分支
│
├─ 6. route_agent_node(并行)
│     ├─ 读 state: destination="成都", days=3
│     ├─ 调飞猪 API 搜索景点
│     ├─ invoke_with_retry(route_agent, ...) → JSON 行程
│     ├─ 返回 {"route_result": "{...}"}
│     └─ checkpoint 自动保存
│
├─ 7. hotel_agent_node(并行)
│     ├─ 读 state: destination="成都", budget="5000"
│     ├─ 调飞猪 API 搜索酒店
│     ├─ invoke_with_retry(hotel_agent, ...) → 对比分析
│     ├─ 返回 {"hotel_result": "{...}"}
│     └─ checkpoint 自动保存
│
├─ 8. food_agent_node(并行)
│     ├─ 读 state: destination="成都", days=3
│     ├─ invoke_with_retry(food_agent, ...) → JSON 餐厅推荐
│     ├─ 返回 {"food_result": "{...}"}
│     └─ checkpoint 自动保存
│
├─ 9. aggregator_node 执行(fan-in,合并所有分支结果)
│     ├─ 读 state: route_result, hotel_result, food_result
│     ├─ 合并美食到每日行程
│     ├─ 生成行程卡片 + 文字摘要
│     ├─ 返回 {"final_result":"done", "messages":[AIMessage("为用户规划了成都3天行程")]}
│     └─ checkpoint 自动保存最终状态
│
├─ 10. 图执行完毕,返回 final state
│
├─ 11. update_user_profile(uid, destination="成都", budget="5000")
│       └─ 写入 SQLite user_profiles 表(跨 session 持久记忆)
│
└─ 12. SSE 推送完成,前端渲染行程卡片

下次同一 session 的请求:

用户: "酒店换便宜点的"
│
├─ graph.invoke() 从 checkpoint 加载 sess_abc 的历史
│   └── 恢复 state: messages=[...之前的消息...], destination="成都", ...
│
├─ supervisor_node
│   ├─ state["messages"] 包含之前的对话
│   ├─ trim_messages 裁剪到 4000 token
│   ├─ LLM 看到上下文:"用户要成都旅行,之前推荐了酒店..."
│   └─ 输出 "PLAN: 成都|0|||便宜|hotel"  ← 只调 hotel agent
│
├─ 只有 hotel_agent 执行(其他不调用)
│
└─ aggregator 汇总

这就是 checkpoint 的价值:跨请求的上下文连续性。


导航

← 上一篇:LangGraph Checkpoint:一次架构简化实践 → 下一篇:Reducer学习:从默认行为到自定义合并