用户隔离设计
一、需求
当前系统所有用户——无论登录与否——共享同一个 SqliteSaver checkpoint 存储。这带来两个问题:
- 游客数据残留:游客临时对话的 checkpoint 写入 SQLite 后永远留在磁盘上,浪费存储
- 无用户认证:没有登录机制,无法区分"回头客"和"路过游客"
目标:
- 内置
admin/admin账号 - 登录用户 → checkpoint 持久化(SqliteSaver),跨进程重启保留
- 游客 → checkpoint 仅存内存(MemorySaver),进程重启即清空
- 多个游客同时访问互不干扰
二、为什么编译两份图而不是动态切换 Checkpointer
LangGraph 的 checkpoint 后端在 graph.compile(checkpointer=...) 时绑定,无法在 graph.invoke() 时更换。因此需要编译两份图实例:
# 登录用户:SQLite 持久化
graph_db = build_graph(checkpointer=SqliteSaver(conn=...))
# 游客:内存存储
graph_mem = build_graph(checkpointer=MemorySaver())
两份图共享相同的节点函数(supervisor_node、route_agent_node 等),只是状态存储后端不同。这意味着:
- 代码只维护一份节点逻辑
- 内存开销增加一份编译后的图结构(约 1MB)
- 请求时根据 token 选择对应的图实例
三、认证流程
3.1 后端
内置用户表(硬编码,简单可靠):
_AUTH_USERS = {"admin": "admin"}
_auth_tokens = {} # token → username(内存存储)
/login 端点验证用户名密码,返回 UUID 令牌:
@app.post("/login")
async def login(request: Request):
body = await request.json()
if _AUTH_USERS.get(body["username"]) == body["password"]:
token = str(uuid.uuid4())
_auth_tokens[token] = username
return {"ok": True, "token": token, "username": username}
return {"ok": False, "error": "用户名或密码错误"}
/chat 端点根据 token 选择图实例:
def _resolve_graph(token: str):
username = _auth_tokens.get(token, "")
if username:
return graph_db, username, True # 登录用户
return graph_mem, "guest", False # 游客
3.2 前端
登录弹窗 → POST /login → 存储 token 到 localStorage → 后续 /api/chat 请求携带 token:
// 登录
const r = await fetch('/login', {
body: JSON.stringify({ username, password })
});
const d = await r.json();
if (d.ok) localStorage.setItem('ta5_user', JSON.stringify({ name: d.username, token: d.token }));
// 游客:不携带 token
localStorage.setItem('ta5_user', JSON.stringify({ name: '游客', token: '' }));
// 聊天请求
fetch('/api/chat', {
body: JSON.stringify({ message: text, token: user?.token || '', session_id: sessionId })
});
3.3 Go 网关
Go 网关新增 /login 路由,直接代理到 Python 后端:
mux.HandleFunc("/login", handleLogin) // POST → http://localhost:8001/login
Chat SSE 代理中,user_id 字段替换为 token,由 Python 端自行解析。
四、多游客并发的安全性
4.1 隔离机制
当前架构处理并发请求的方式:
请求1(游客A,session_A)→ daemon thread 1 → graph_mem.invoke(config_A)
请求2(游客B,session_B)→ daemon thread 2 → graph_mem.invoke(config_B)
请求3(admin,session_X) → daemon thread 3 → graph_db.invoke(config_X)
每个请求:
- FastAPI 在独立线程中调用
run()函数(threading.Thread(target=run, daemon=True)) - 每个线程有独立的
invoke_input字典(不同thread_id/session_id) - LangGraph 的
MemorySaver内部用thread_id做 key 隔离状态
4.2 MemorySaver 的线程安全性
# LangGraph MemorySaver 内部实现(简化)
class MemorySaver:
def __init__(self):
self._saves = {} # thread_id → {checkpoint_id → state}
def get_tuple(self, config):
thread_id = config["configurable"]["thread_id"]
return self._saves.get(thread_id, {}).get(latest)
因为不同游客使用不同的 session_id(即 thread_id),MemorySaver 内部 dict 的 key 不同,状态天然隔离。Python 的 dict 操作是线程安全的(GIL 保护),所以多个线程同时访问 MemorySaver 不会出现数据竞争。
4.3 为什么不用协程
当前用的是 threading.Thread 而非 asyncio,原因是:
graph.invoke()是同步阻塞的(内部包含 subprocess 调用和 HTTP 请求)- 同步代码在线程中运行比在 asyncio 事件循环中更简单可靠
- FastAPI 的
StreamingResponse在线程化的 SSE 推送中工作良好
五、两种 Checkpoint 后端的对比
| 维度 | MemorySaver | SqliteSaver |
|---|---|---|
| 持久化 | 进程内存,重启即丢失 | SQLite 文件,永久保存 |
| 适用用户 | 游客(无需保存历史) | 登录用户(需要"回头") |
| 并发安全 | GIL 保护 dict 操作 | SQLite 文件锁 + WAL 模式 |
| 性能 | 极快(纯内存读写) | 略慢(磁盘 I/O) |
| 存储成本 | 随进程停止自动释放 | 持续增长,需定期清理 |
| 多进程 | 不共享(进程隔离) | 共享(同一数据库文件) |