生命周期状态流
Agent 系统作者:8 个嵌入点不是 8 个独立的 hook——它们共享一套跨步、跨轮、跨 session 的状态。本章按状态流切一刀,追踪一次对话里每条 state 怎么流动。
为什么再切一刀
应用到自研 Agent 按 hook 切片 —— “你在 prepareStep 里做什么 / 在 tool.execute 里做什么”。这是静态地图。
但真实的生产代码不是 8 个孤立的 hook —— 它们共享一套状态:
onToolCallFinish写的 compactCache,下一步prepareStep读onStepFinish写的 lastStepTotalTokens,下一步prepareStep读来决定是否压缩tool.execute写的 reminders,后续prepareStep注入 promptonFinish持久化的 messages,下次 call-site 加载
这些producer-consumer 对不讲清楚,读者落到实现时就会踩。
本章是动态走读——追踪一次对话的状态怎么在 8 个点之间演化。和第 9 章的静态地图互补。
免责声明:以下代码示例是 CC 设计 + AI SDK 最佳实践的教学合成,字段名和 Zapvol 真实实现有细节差异——例如示例里的
context.compactCache: new Map()在 Zapvol 里实际是基于文件的 cache(.compact.json),session.compactBoundary实际是session.compactionCheckpoint的一部分。本章重在状态流的模式而非字面的字段匹配;具体 Zapvol 实现见packages/backend/src/agent/源码。
四种状态桶:按生命周期长度分类
所有 agent 状态可以装进4 个桶,按”活多久”区分:
| 桶 | 生命周期 | 典型字段 | 存储 |
|---|---|---|---|
| Per-step | 本步开始到结束 | stepNumber · stepMessages · 当前 tool_use 块 · 当前 LLM response stream | AI SDK 内部 |
| Per-call | streamText 开始到 onFinish 返回 | toolUseContext · abortController · compactCache · lastStepTotalTokens · reminders · todos | 你维护的 context 对象 |
| Cross-call (in session) | 一个 session 内跨多次 streamText | session.messages · compactBoundary · compactionCheckpoint · 累计 usage | Session DB / 内存 |
| Cross-session | 跨会话持久 | CLAUDE.md / AGENT.md · user prefs · auto memory · policy rules | 文件系统 / DB |
最容易错的是 per-call 和 cross-call 的边界——这两类都”跨多步”但生命周期不同:
- per-call 是一次 streamText 调用内部跨步共享;streamText 返回后这些 state 应该 GC
- cross-call 是一次 streamText 完成后持久化,下次 call-site 读回
混淆这两类 = 要么状态没持久化(下次读不到),要么内存泄漏(per-call state 永不释放)。
一次完整对话的状态走读
用一个真实场景串一遍——“用户说’帮我修复 auth bug’,agent 跑 3 步完成”——追踪每个嵌入点读写了哪些 state。
前置:session 恢复
用户按 Enter 之前,session 层已经有:
session.messages = [/* 之前若干轮的 messages */]
session.compactionCheckpoint = { lastStepTotalTokens: 45_000, compactedRounds: {...} }
session.usage = { cumulativeInput: 120_000, cumulativeOutput: 8_000, ... }
cross-session 层稳定:
CLAUDE.md (文件系统): 项目规则
~/.claude/CLAUDE.md: 用户偏好
autoMemory: [feedback_terse, project_migration_freeze, ...]
点 A · Call-site 组装
// 用户新消息 + session.messages 拼接
const allMessages = [...session.messages, { role: 'user', content: userInput }]
// 读 cross-session
const claudeMd = await loadLayeredClaudeMd() // Managed → User → Project → Local
const autoMem = await loadAutoMemoryIndex() // MEMORY.md 索引(≤200 行)
const systemPrompt = assembleSystemPrompt({ claudeMd, autoMem, ...staticParts })
// 写 per-call(初始化 context)
const context: ZapvolContext = {
taskId,
abortController: new AbortController(), // 生命贯穿整个 streamText
lastStepTotalTokens: session.compactionCheckpoint.lastStepTotalTokens, // ← 播种
compactCache: new Map(), // 空,等着 E 来写
reminders: [],
todos: [...],
// ...
}
// 启动 AI SDK
const result = streamText({
system: systemPrompt, // 字面量稳定
messages: allMessages,
tools: buildTools(),
prepareStep: /* 见点 B */,
experimental_onToolCallFinish: createToolPrecompactHook({ context, ... }),
onStepFinish: step => {
// 更新 per-call state
context.lastStepTotalTokens = deriveFromStep(step) // ← producer
},
onFinish: /* 见点 G */,
})
A 的 state 读写:
- 读:
session.messages,session.compactionCheckpoint,claudeMd,autoMem(都是 cross-session / cross-call) - 写:
context.lastStepTotalTokens(播种初始值),context.abortController(新建)
关键:context.lastStepTotalTokens 初始值来自 session.compactionCheckpoint。这是 cross-call → per-call 的状态传递——session 恢复时把上轮末的 token 估算继承下来。
点 B · prepareStep(step 0,首步)
prepareStep: async ({ messages: stepMessages, steps, model }) => {
const isFirstStep = steps.length === 0 // 真
// 读 per-call state
const currentTokens = context.lastStepTotalTokens // ← 此时还是 A 写的初始值
const reminders = context.reminders
// 首步:boundary filter + autocompact
const afterBoundary = getMessagesAfterCompactBoundary(
stepMessages,
session.compactBoundary,
)
let compacted = afterBoundary
if (currentTokens > AUTOCOMPACT_THRESHOLD) {
compacted = await autocompact(afterBoundary, {
signal: context.abortController.signal, // ← 传 abort
})
}
// 布 cache breakpoint
const prepared = applyCacheControl(compacted, model)
return { messages: prepared }
}
B 在 step 0 的 state 读写:
- 读:
stepMessages(AI SDK 传入),context.lastStepTotalTokens,session.compactBoundary,context.abortController.signal - 写:无(只覆盖本步输入,不写 context)
点 C · LLM 流式响应
模型流出 text 和 tool_use 块。不读 per-call state(AI SDK 内部跑),但产生的 stream events 会进到点 D 的触发。
如果是 thinking 模式,thinking block 的签名是本步产出的 output 一部分——这个签名会进到下一步的 messages,下一步 prepareStep 看得到它。不要乱动签名字段。
点 D · tool.execute(grep “auth” 文件)
tool({
inputSchema: z.object({ pattern: z.string() }),
execute: async (input, { abortSignal, toolCallId }) => {
// 读 per-call state
if (abortSignal.aborted) throw new AbortError()
// 读 cross-session / per-call
const permission = await checkPermission('grep', input, context)
if (permission.behavior === 'deny') {
return { error: 'permission_denied' }
}
// 实际执行
const result = await execFile('rg', [input.pattern, '.'], { signal: abortSignal })
const str = result.stdout
// 可能写 per-call
if (str.length > MAX_RESULT_CHARS) {
const path = await offloadToSandbox(context.sandbox, str)
return { truncated: true, preview: str.slice(0, 1000), offloadPath: path }
}
return { output: str }
},
})
D 的 state 读写:
- 读:
abortSignal,context(permission 用) - 写:可能写入
context.sandbox(offload 文件),可能写context.todos(如果有”自动加 todo”类工具)
点 E · onToolCallFinish(关键联动点)
experimental_onToolCallFinish: async (event) => {
if (!event.success) return
const { toolCall, output, abortSignal } = event
// 读:估算
const tokens = estimateTokens({ input: toolCall.input, output })
if (tokens < PRECOMPACT_TRIGGER_TOKENS) return
// 读:compactCache(幂等)
if (context.compactCache.has(toolCall.toolCallId)) return
// 执行(跑一次便宜模型 LLM 调用)
const compactResult = await raceAbort(
compactorFor(toolCall.toolName)(toolCall.input, output, context),
abortSignal,
)
if (!compactResult) return
// ←←← **关键写入**
context.compactCache.set(toolCall.toolCallId, compactResult)
}
E 的 state 读写:
- 读:
toolCall,output,context.compactCache(检查幂等),abortSignal - 写:
context.compactCache.set(toolCall.toolCallId, ...)
这一写是 E 和 B 连接的关键——E 把压缩结果缓存起来,下一步 B 读取替换。下面的”点 B 再次登场(step 1)“段就是消费端。
点 B 再次登场(step 1):消费 E 写的 cache
现在进入 step 1(有了 step 0 的 tool 结果要处理):
prepareStep: async ({ messages: stepMessages, steps, model }) => {
const isFirstStep = steps.length === 0 // 假,现在是 step 1
// 读 per-call state(lastStepTotalTokens 被 onStepFinish 更新过了)
const currentTokens = context.lastStepTotalTokens
// ↓↓↓ **关键读取**:消费 E 写的 cache
const truncated = await truncateOldToolResults(stepMessages, {
compactCache: context.compactCache, // ← 读 E 的写
})
// truncateTools 内部:
// for (const part of message.parts) {
// if (isToolResultPart(part)) {
// const cached = compactCache.get(part.toolCallId)
// if (cached) {
// part.output = cached.shortOutput // ← 用 E 压好的版本替换
// }
// }
// }
// 可能再做一轮 autocompact(如果 microcompact 不够)
let compacted = truncated
if (currentTokens > AUTOCOMPACT_THRESHOLD) {
compacted = await autocompact(truncated, { signal: context.abortController.signal })
}
// reminders 注入
let final = applyCacheControl(compacted, model)
if (context.reminders.length > 0) {
const text = context.reminders.join('\n')
final = [...final, { role: 'user', content: text }]
context.reminders = [] // ← 消费后清空(避免每步重复注入)
}
return { messages: final }
}
这里就把 E 和 B 连起来了:
- E 写:
context.compactCache.set(toolCallId, compactResult)—— 在 tool 返回后立即跑便宜模型压缩,缓存起来 - B 读:下一步
prepareStep遍历 tool results,按 toolCallId 命中 cache 就用压缩版替换原 output - 净效果:主 agent 的 LLM 调用看到的是已压缩的 tool results,token 消耗大幅下降,且压缩那一次 LLM 调用跑在 Haiku 上不占主 agent 关键路径
这个producer-consumer 对是 Zapvol tool-precompact.ts + tiers/tool.ts 的核心设计。不理解这对,就看不懂为什么要”提前”压缩。
B 在 step 1 的 state 读写:
- 读:
stepMessages,context.lastStepTotalTokens,context.compactCache,context.reminders,context.abortController.signal - 写:
context.reminders = [](消费后清空)
点 F · stopWhen
stopWhen: [
stepCountIs(50),
hasToolCall('complete'),
({ steps }) => context.shouldStop,
]
读:steps, context.shouldStop(外部信号)
写:无
点 G · onFinish
onFinish: async ({ response, usage, finishReason }) => {
// 读 per-call
const newMessages = response.messages
const terminalReason = deriveTerminalReason(finishReason, context)
// 写 cross-call(session 层)
await db.transaction(async tx => {
await tx.session.appendMessages(session.id, newMessages)
await tx.session.updateCheckpoint(session.id, {
lastStepTotalTokens: context.lastStepTotalTokens, // ← 把 per-call 的值冻到 session 层
compactedRounds: context.stepCompactor.getCheckpoint(),
})
await tx.session.recordUsage(session.id, usage)
await tx.session.setLastFinishReason(session.id, terminalReason)
})
}
G 的 state 读写:
- 读:
response.messages,usage,finishReason,context.lastStepTotalTokens,context.stepCompactor(最终状态) - 写:
session.messages(append),session.compactionCheckpoint(整体刷新),session.usage(累加),session.terminalReason
关键:G 里是 per-call → cross-call 的状态固化点。没做这一步,下次 call-site 读不到新消息,等于对话丢了。
点 H · 后台任务
onFinish: async (event) => {
// 先同步 G 的持久化
await persistSession(event, session)
// 再启动异步后台(不 await)
void backgroundMemoryExtraction(event.response.messages, context)
void backgroundResumePreCompact(session.id)
}
H 的 state 读写:
- 读:
response.messages,context(一次快照) - 写:
autoMemory(cross-session)—— 提取的新记忆条目 - 写:
session.resumePreCompact(cross-call)—— 预算好的压缩摘要,下次 resume 直接读
H 之后:context 对象生命周期结束,per-call state 全部回收。session / 记忆层沉淀下来等下次。
状态读写矩阵
把上面的走读凝练成一张 matrix。一眼看到每条 state 的 producer 和 consumer。
| 状态条目 | A | B | C | D | E | F | G | H |
|---|---|---|---|---|---|---|---|---|
session.messages | R | — | — | — | — | — | W | R |
session.compactionCheckpoint | R | — | — | — | — | — | W | — |
session.usage (累计) | — | — | — | — | — | — | W | — |
context.lastStepTotalTokens | W | R | — | — | — | — | R | — |
context.abortController.signal | W | R | — | R | R | — | — | R |
context.compactCache | W (init) | R | — | — | W | — | — | — |
context.reminders | R | RW | — | W | — | — | — | — |
context.todos | R | R | — | RW | — | R | — | — |
session.resumePreCompact | R | — | — | — | — | — | — | W |
autoMemory | R | — | — | — | — | — | — | W |
CLAUDE.md | R | — | — | — | — | — | — | — |
(RW = 同步读写;W 粗体表示关键 producer 点)
读这张矩阵的三个关键发现:
compactCache的 W 在 E 唯一一处,R 在 B 唯一一处——典型的 producer-consumer,只靠这对运转abortController.signal几乎所有点都读——这就是”全链路传播”的具体体现session.messages的 W 在 G,R 在下次 A——跨 call 的 handoff 全靠 G 的持久化
关键 producer-consumer 对(深入讲 3 个)
对 1:compactCache (E 写 → B 读)
目的:工具结果在下一步主 agent 看到之前先用便宜模型(Haiku)压缩,主 agent 的关键路径不占用压缩 LLM 调用的延迟和成本。
流动:
tool.execute 返回 output (D)
↓
onToolCallFinish 触发 (E)
↓
检查 output 大小 > 阈值?
↓ 是
调 Haiku: compact(input, output) → { shortOutput, offloadPath }
↓
context.compactCache.set(toolCallId, result)
↓ (下一步 prepareStep 触发)
prepareStep 看到 tool_result 列表 (B)
↓
for each tool_result:
if compactCache.has(toolCallId):
replace with compactCache.get(...).shortOutput
↓
返回 { messages: truncated } 覆盖本步输入
↓
主 agent 的 LLM 调用看到的是压缩版
这里最容易错的点:
- key mismatch:E 写的 key 是
toolCall.toolCallId,B 读的 key 是part.toolCallId——如果类型不一致(string vs number),Map.get永远 miss。 - 幂等:E 要检查”这个 toolCallId 已经压过吗”,否则 checkpoint 重放时重复跑 LLM。
- Race abort:E 跑 Haiku 的过程中用户按 ESC,这次压缩必须立刻停——否则 abort 后后台还在花钱。
对 2:lastStepTotalTokens (A 播种 → onStepFinish 更新 → B 消费)
目的:压缩阈值判断(“要不要 autocompact”)依赖准确的 token 估算。如果 lastStepTotalTokens 不更新,阈值判断用的是 stale 值,autocompact 要么永远不触发,要么误触发。
流动:
A: context.lastStepTotalTokens = session.compactionCheckpoint.lastStepTotalTokens // 从上轮末继承
↓
B (step 0): if (lastStepTotalTokens > AUTOCOMPACT_THRESHOLD) → 压缩
↓
LLM 调用完成
↓
onStepFinish: context.lastStepTotalTokens = step.usage.inputTokens + step.usage.outputTokens // 更新
↓
B (step 1): 再次 if 判断,用更新值
↓ ...
G: session.compactionCheckpoint.lastStepTotalTokens = context.lastStepTotalTokens // 冻结供下轮
这里最容易错的点:
- 忘了在 onStepFinish 写:autocompact 永远不触发,session 跑飞到 context overflow。
- 算错公式:step.usage 里有
inputTokens/outputTokens/cachedInputTokens/cacheCreationInputTokens四个字段——哪个代表”下一步 prompt 大小”?Zapvol 的选择是inputTokens + outputTokens(注释里有解释)——错一个就长期偏差。
对 3:session.messages (G 写 → 下次 A 读)
目的:跨 call 的对话持久化。刷新浏览器 / resume session 不丢对话。
流动:
G (本次 onFinish):
await db.session.appendMessages(session.id, response.messages) // 同步
↓ (session 结束,context 回收)
↓ (用户隔一段再来)
A (下次 call-site):
const history = await db.session.loadMessages(session.id) // 读回
const allMessages = [...history, newUserMessage]
这里最容易错的点:
- G 异步写没 await:用户刷新比 db.write 快,丢对话。G 必须同步等 DB 确认。
- compactBoundary 没跟着更新:下次 A 加载了 messages 但不知道哪段已压缩过,下次 prepareStep 重新压缩一遍——
session.compactionCheckpoint和session.messages必须原子更新(同一事务)。 - thinking block 签名在序列化时被破坏:DB 存的 JSON 字段顺序变了 / 浮点精度丢了 —— 下次提交签名对不上,API 拒绝。
8 条关键约束
状态流上有 8 条必须遵守的编码纪律(只有第 1 条是严格数学意义上的不变量,其他是操作要求——违反任一 = 一类 bug):
| # | 约束 | 破坏后的症状 |
|---|---|---|
| 1 | session.compactBoundary <= session.messages.length | getMessagesAfterCompactBoundary 返回空 / 崩溃 |
| 2 | context.compactCache[toolCallId] 的 key 等于 message 里 tool_use.id | cache 永远 miss,等于没压缩 |
| 3 | abortController.signal.aborted === true 后,所有跑着的 tool execute 必须在若干秒内退出 | 用户按 ESC 但后台继续烧 API |
| 4 | session.messages 和 session.compactionCheckpoint 必须同事务更新 | resume 时 checkpoint 指向不存在的消息 index |
| 5 | context.lastStepTotalTokens 必须在 onStepFinish 更新 | autocompact 不触发,context 爆炸 |
| 6 | context.reminders 消费后必须清空 | 每步重复注入同一 reminder,prompt 污染 |
| 7 | thinking block 签名字段必须按位保持,序列化 / 反序列化不能动 | 下次提交 API 拒绝”thinking signature mismatch” |
| 8 | per-call state 在 onFinish 返回前必须已固化到 cross-call 层 | session 结束后丢数据 |
常见 state-flow bug 与症状对照表
生产中遇到过的几类 bug,反推到 state 违规:
| 症状 | 根因 | 修在哪 |
|---|---|---|
| ”微压缩看起来没效果,每次 tool result 还是全量送进 LLM” | 对 1 的 key mismatch:E 用 toolCall.toolCallId,B 用 part.tool_call_id(命名不一致) | 统一 key 命名 + 加日志验证 cache 命中率 |
| ”autocompact 从来没触发过,看 usage 一路涨到 overflow” | 对 2 的 onStepFinish 忘了写:context.lastStepTotalTokens 永远是初始值 | 在 onStepFinish 里显式更新 |
| ”用户按 ESC 后,几十秒才真正停” | 约束 3 破坏:某个 tool 的 execute 没接 abortSignal | 在每个长耗时 fetch/spawn 里传 signal |
| ”对话历史刷新后丢最后一轮” | 对 3 的 G 异步写:onFinish 没 await DB | 改 onFinish 为 async + await |
| ”resume 一个 session 后,前几轮消息是乱的” | 约束 4 破坏:messages 和 checkpoint 分开写,有一个失败 | 改成 DB 事务 |
| ”prompt 里发现同一条 reminder 重复出现 5 次” | 约束 6 破坏:消费后没清空 | 消费后 context.reminders = [] |
| ”API 返回 ‘thinking signature mismatch‘“ | 约束 7 破坏:JSON 序列化把签名字段搞乱了 | 序列化时不动 thinking block,直接原样存 |
| ”resume 速度慢,每次要等 5 秒” | H 的 resumePreCompact 没写:runtime 才开始压缩 | 在 H 异步写好摘要供下次读 |
AbortSignal 的传播路径(单列展开)
状态流里最容易半截实现的是 AbortSignal。跟其他 state 不一样——它不是 producer-consumer 对,是广播模型:
用户按 ESC 后,A 点的 abortController.abort() 把 signal.aborted 翻到 true。之后每个观察点独立 race,不是层层转发:
- B(下一轮 prepareStep 开头):
if (signal.aborted) throw早退 - C(流式消费):
fetch({ signal })原生中断 - D(tool.execute):AI SDK 已经把 signal 塞到第二参数,每个 tool 自己 check
- E(onToolCallFinish):
raceAbort(compactLLM, signal)显式竞速 - H(后台任务):独立检查 signal,不依赖 stream 关闭通知
最后 AI SDK 的循环在自己的点上检测 signal.aborted → 结束 stream → G 的 finishReason === 'abort'。
关键:半截 abort 比没有更糟——用户以为取消了实际还在烧 API。任何新加的长耗时操作(fetch / execFile / spawn / DB call / LLM 调用)都必须接 signal。
3 个关键自检问题
下面 3 个问题覆盖了状态流里最容易踩的坑。如果不翻源码答不出任一条,回去再读对应章节:
compactCache的生命周期:一次streamText调用结束后还在吗?如果 agent 跨两次streamText(同 session 内),第二次还能读到第一次 E 写的 cache 吗?onStepFinish里忘记更新lastStepTotalTokens:autocompact 会怎么样?会在第几步开始出问题?最终现象是什么?- G (onFinish) 里
appendMessages已经 append 但updateCheckpointDB 写失败:下次 resume 时会读到什么样的 state?有什么连锁反应?
答案都在本章前面的 producer-consumer / 关键约束 / bug 对照表里——这 3 个问题各指向一个约束的破坏路径。
给自研 agent 的要点
把 state flow 视角的关键沉淀成几条可直接用的:
- 先画 state 桶:把你的 agent state 按生命周期分成 per-step / per-call / cross-call / cross-session 四桶。分不清 = 设计有问题
- 找 producer-consumer 对:每条 state 至少有一个 producer 一个 consumer(否则是 dead state)。用矩阵扫一遍
- per-call → cross-call 固化点要显式:Zapvol 的
onFinish里updateCheckpoint就是这种点,必须事务原子 - AbortSignal 全链路传:每个观察点独立 race,不是层层转发。半截实现比没有更糟
- compactCache 的 key 统一:E 和 B 读写同一 cache,key 类型 / 命名要一致,加日志验证命中率
- 消费后清空:
reminders/ 一次性 flag 消费后立即清零,否则每步重复注入 - lastStepTotalTokens 在 onStepFinish 更新:不更新等于 autocompact 永不触发
- thinking block 签名原样存:序列化 / 反序列化不动签名字段
- H 的后台任务独立于 stream 关闭:不要 await,但要自己维护生命周期(signal / cancellation)
- 8 条关键约束对着 codebase 扫:每条 “否” 都是一类潜在 bug
延伸阅读
- 应用到自研 Agent (AI SDK)——按 hook 切的静态地图,和本章互补
- Agent 运行循环——Claude Code 自己的状态机(State 对象 14 字段)
- 上下文压缩——compactCache / compactionCheckpoint 这些状态的 5 级流水线背景
- 设计启示——本章所有 state 原则的抽象版本
- Zapvol 参考实现:
packages/backend/src/agent/agent-round.ts+packages/backend/src/agent/compaction/+packages/backend/src/agent/context/(若存在)