Agent 运行循环

Agent 系统作者:一次 Claude Code 对话到底发生了什么——query async generator、每轮 14 步流水线、StreamingToolExecutor、重试 / 恢复 / 熔断路径,全部源码级还原。

这一章填的空白

前面讲了静态组成:prompt 怎么拼 / 记忆怎么存 / 压缩怎么做 / 权限怎么判。但有一个核心问题一直没答:一次对话到底是怎么运行起来的

  • 用户按 Enter 之后,Claude Code 在内部做了什么?
  • ReAct 循环是怎么实现的?
  • 多轮工具调用怎么调度的?
  • 一轮出错后怎么恢复?
  • claude --resume 从上次退出处恢复的机制是什么?

这一章从源码级还原 agent 的运行生命周期。主要参考 query.ts(1729 行)+ QueryEngine.ts(1295 行)+ Task.ts(125 行)+ query/ 目录。


顶层结构:query() 是一个 async generator

Claude Code 的主循环定位在一个函数里:

// query.ts 第 219 行
export async function* query(
  params: QueryParams,
): AsyncGenerator<
  | StreamEvent
  | RequestStartEvent
  | Message
  | TombstoneMessage
  | ToolUseSummaryMessage,
  Terminal
> {
  const consumedCommandUuids: string[] = []
  const terminal = yield* queryLoop(params, consumedCommandUuids)
  for (const uuid of consumedCommandUuids) {
    notifyCommandLifecycle(uuid, 'completed')
  }
  return terminal
}

两个关键设计决策:

  1. Async generator 而不是 Promise——UI 层实时消费 yield 出来的 StreamEvent,用户看到的”字流式”输出就是这里来的
  2. Terminal 返回类型——generator 有明确的 exit reasonTerminal),不是黑盒结束。调用者能区分 “正常完成 / 被 abort / 撞到 blocking limit / maxTurns 耗尽” 等多种终止原因

queryLoop 是内部实现,query 只是包了一个 command lifecycle notify 的薄壳——如果 loop throw 或被 .return() 调用,command 不会被 completed 通知(只有正常返回才通知)。这是生命周期对称性:started 不保证 completed。


QueryParams:一次调用的完整输入

export type QueryParams = {
  messages: Message[]                      // 到目前为止的对话
  systemPrompt: SystemPrompt                // 预先装配好的系统提示词
  userContext: { [k: string]: string }      // CLAUDE.md / currentDate
  systemContext: { [k: string]: string }    // gitStatus / cacheBreaker
  canUseTool: CanUseToolFn                  // 权限检查回调
  toolUseContext: ToolUseContext            // 工具执行上下文(含 mode / 允许的 tool)
  fallbackModel?: string                    // 主 model 失败时的 fallback
  querySource: QuerySource                  // 调用来源标识(repl_main_thread / compact / ...)
  maxOutputTokensOverride?: number          // 单轮输出上限覆盖
  maxTurns?: number                         // 循环上限
  skipCacheWrite?: boolean                  // 跳过 cache write
  taskBudget?: { total: number }            // API 侧的 task budget (beta)
  deps?: QueryDeps                          // 可注入的依赖(测试用)
}

可注入的 deps 是一个精妙的设计(query/deps.ts):

export type QueryDeps = {
  callModel: typeof queryModelWithStreaming    // LLM 调用
  microcompact: typeof microcompactMessages    // 工具结果压缩
  autocompact: typeof autoCompactIfNeeded      // 自动压缩
  uuid: () => string                            // ID 生成
}

源码注释解释为什么:“tests can inject fakes directly instead of spyOn-per-module — the most common mocks (callModel, autocompact) are each spied in 6-8 test files today with module-import-and-spy boilerplate”

给自研 agent 的启示:顶层循环函数的依赖要可注入——测试环境直接注 fake,不用 spy 6-8 个模块。这是可测试性的基础。


循环状态:14 字段状态机

queryLoop 是一个无限 while 循环,用 State 对象携带跨迭代状态:

type State = {
  messages: Message[]                                            // 对话历史
  toolUseContext: ToolUseContext                                 // 工具上下文
  autoCompactTracking: AutoCompactTrackingState | undefined      // 压缩熔断器
  maxOutputTokensRecoveryCount: number                           // max-tokens 恢复次数
  hasAttemptedReactiveCompact: boolean                           // reactive 压缩已尝试?
  maxOutputTokensOverride: number | undefined                    // 输出 token 覆盖
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null>   // 异步的工具调用摘要
  stopHookActive: boolean | undefined                            // stop hook 正在跑?
  turnCount: number                                              // 当前轮次
  transition: Continue | undefined                               // 上一轮为什么 continue
}

每个字段回答一个具体问题,没有冗余:

  • transition:上一轮为什么没结束,直接驱动下一轮的处理(注释:“Lets tests assert recovery paths fired without inspecting message contents”——让测试可以断言”恢复路径跑了”,而不用翻消息内容)
  • maxOutputTokensRecoveryCount:独立的子循环计数器——max-output-tokens 错误后多次加量重试,不是全局 turnCount
  • hasAttemptedReactiveCompact:每轮只能尝试一次 reactive compact,避免死循环
  • pendingToolUseSummary:跟工具调用摘要走异步——主循环不等它,后台生成

给自研 agent 的启示:状态字段多并不坏,关键是每个字段有明确的语义责任。14 字段 × 清晰语义 远胜 4 字段 × { [key: string]: any }


每轮 14 步:处理流水线

query() · 每轮 14 步流水线 (Per-Turn Pipeline) Async generator 输出事件流 · queryLoop while(true) 搭配 10 字段状态机 while (true) { 阶段 1 · SETUP (准备本轮状态) 3 步 1 State 解构 从 10 字段 State 对象里取本轮需要的字段 2 Skill 预取 (并发) startSkillDiscoveryPrefetch — 在 LLM 流式期间后台跑 3 yield { type: 'stream_request_start' } 告诉 UI "本轮开始" 阶段 2 · HISTORY PREP (由便宜到贵的压缩级联) 6 步 4 getMessagesAfterCompactBoundary 只看最近 compact boundary 之后的消息 5 applyToolResultBudget per-message 工具结果总量预算 — 必须在 microcompact 之前 6 HISTORY_SNIP (feature-flagged) 策略式历史裁剪,在 compact 之前 7 microcompact — 工具结果清理 (零 LLM 调用) time-based 或 cached-microcompact (服务器端 cache editing) 8 contextCollapse (feature-flagged) 另一套上下文管理系统;启用时 autocompact 主动让位 9 autocompact — LLM 摘要 (1 次 LLM 调用) effectiveWindow - 13k 触发;连续 3 次失败 → 熔断 阶段 3 · RUN & TOOLS (调模型 + 执行工具) 5 步 10 Blocking limit 检查 撞硬顶 → yield error, return { reason: 'blocking_limit' } 11 deps.callModel — 流式 LLM 调用 model fallback + streaming fallback (两层重试) 12 yield messages (streaming fallback 时 tombstone) tombstone = 显式作废标记 (已流出的消息不能撤回) 13 StreamingToolExecutor 或 runTools streaming executor 在 tool_use 块到达时就启动工具,不等完整 message 14 收集 toolResults, 决定 continue needsFollowUp = toolUseBlocks.length > 0 } if (needsFollowUp) continue ↑ else return { reason } → 6 种 Terminal 退出原因 done 本轮无 tool_use aborted 用户中断 blocking_limit 撞到硬顶 max_turns 超过 turn 上限 stop_hook_blocked Stop hook 拒绝 error 不可恢复错误

每次 while 循环的一轮最多跑这 14 步(很多步会因条件不触发而跳过):

#步骤源码位置作用
1State 解构query.ts 第 311-321 行从 State 拿本轮需要的字段
2Skill 预取第 331 行 startSkillDiscoveryPrefetch并发预取相关 skill,在 LLM 流式期间跑
3Yield stream_request_start第 337 行给 UI 的”开始了”信号
4getMessagesAfterCompactBoundary第 365 行只看 compact boundary 之后的消息,已压缩的跳过
5applyToolResultBudget第 379 行执行 per-message 的 tool result 预算
6HISTORY_SNIP(feature-flagged)第 401 行删减策略式的历史清理
7Microcompact第 414 行工具结果清理(清老 Read/Bash 结果)
8Context Collapse(feature-flagged)第 441 行另一套上下文管理系统
9Auto-compact第 454 行LLM 续写摘要(见 压缩
10Blocking limit check第 641 行如果撞到硬顶上限,yield error 并 return { reason: 'blocking_limit' }
11callModel 流式调用第 659 行 deps.callModel调 LLM,流式接收 message / event
12Yield messages第 708 行之后逐条 yield 给 UI(含 tombstone 回滚机制)
13工具执行StreamingToolExecutorrunTools并行 / 串行跑 tool
14收集 toolResults,决定是否 continue循环末尾needsFollowUp = toolUseBlocks.length > 0

关键细节

2. Skill 预取和 LLM 流式并行

const pendingSkillPrefetch = skillPrefetch?.startSkillDiscoveryPrefetch(...)
// ... 继续处理
// ... 调用 LLM,流式接收
// LLM 响应期间 skill 预取在后台跑

注释说:“Replaces the blocking assistant_turn path that ran inside getAttachmentMessages (97% of those calls found nothing in prod).”——原来 skill 发现是阻塞的,生产中 97% 找不到任何 skill 却阻塞了整轮。现在并发跑,几乎零成本。

4. getMessagesAfterCompactBoundary —— 压缩边界保护

压缩之后旧消息被替换成 summary。这里只取最近 boundary 之后的消息。注释:“REPL keeps snipped messages for UI scrollback — project so the compact model doesn’t summarize content that was intentionally removed”——UI 的滚动条保留着被”snip”掉的消息供展示,但模型那边不能看见(否则又被重新压缩一次)。

5. applyToolResultBudget —— 每条消息的 tool result 预算

Enforce per-message budget on aggregate tool result size. Runs BEFORE microcompact — cached MC operates purely by tool_use_id (never inspects content), so content replacement is invisible to it and the two compose cleanly.

意思:有一个 per-message 的工具结果总量预算(不同工具可以有不同上限),超预算就把内容替换为占位。顺序很关键——必须在 microcompact 之前,因为 cached microcompact 只看 tool_use_id 不看内容,两者能无缝组合。

10. Blocking limit —— 硬顶之前的主动阻塞

const { isAtBlockingLimit } = calculateTokenWarningState(
  tokenCountWithEstimation(messagesForQuery) - snipTokensFreed,
  model,
)
if (isAtBlockingLimit) {
  yield createAssistantAPIErrorMessage({ content: PROMPT_TOO_LONG_ERROR_MESSAGE, ... })
  return { reason: 'blocking_limit' }
}

当自动压缩被用户关掉时,这个检查预先阻止超限——为手动 /compact 留出 MANUAL_COMPACT_BUFFER_TOKENS = 3000 的空间。注释详述这个门槛要跳过的四种情况:

  • 刚刚压缩过(compactionResult)——usage 是 stale 的
  • querySource === 'compact' / 'session_memory'——forked agent 会死锁
  • Reactive compact enabled——让真 413 触发 reactive
  • Context collapse enabled——collapse 自己管

给自研 agent 的启示:硬顶拦截不该是一个全局开关,要能精确豁免特殊调用路径——否则特殊路径会在 blocking limit 上死锁。

13. Streaming Tool Executor —— 边流边执行

const useStreamingToolExecution = config.gates.streamingToolExecution
let streamingToolExecutor = useStreamingToolExecution
  ? new StreamingToolExecutor(...)
  : null

两条路径:

  • 传统:LLM 流完 → 解析 tool_use → 按顺序 / 并发执行工具 → 拿结果
  • StreamingStreamingToolExecutor):LLM 流到 tool_use block 一出现就启动对应工具,不等整个 assistant message 完成

延迟显著降低——如果是多个独立工具调用,能接近并发执行的 wall-clock 时间。


模型 fallback + streaming fallback

第 654 行的 while (attemptWithFallback)双层 fallback 逻辑:

  1. Model fallback:主 model 失败(API error / throttle) → 换 fallbackModel 重试
  2. Streaming fallback:streaming 模式下出错(如 thinking block 异常),切到非 streaming 重试

源码里有一段特别难读但很重要的处理:streaming fallback 触发时,之前流出去的半截 assistant message 要被 tombstone——因为它可能带着无效的 thinking block signature,重新提交会被 API 拒绝。

if (streamingFallbackOccured) {
  for (const msg of assistantMessages) {
    yield { type: 'tombstone' as const, message: msg }  // UI 和 transcript 删掉这条
  }
  assistantMessages.length = 0
  toolResults.length = 0
  // ... 丢弃 pending tool results,重开 executor
}

Tombstone 消息是 UI 和 transcript 的”删除标记”——流出去的消息不能从客户端撤回,但 tombstone 告诉下游”这条作废”。

给自研 agent 的启示:streaming 输出的撤回机制必须存在。LLM 流到一半发现有问题,不可能从客户端”收回”已经流出的字符——需要一个显式作废标记


终止条件:Terminal 的 6 种 reason

根据 query.ts 里 return { reason: ... } 的所有分支,循环可能以这些原因终止:

Reason条件含义
blocking_limit撞到硬顶手动压缩都救不了
max_turnsturnCount > maxTurns用户设置的 turn 上限
done本轮没有 tool_use模型认为任务完成
abortedabortController.signal.aborted用户中断
stop_hook_blockedStop hook 返回 block用户 hook 阻止继续
error其他异常不可恢复错误

不同 reason 对应不同的后续处理——“done” 后 UI 显示完成、“aborted” 显示”已取消”、“blocking_limit” 引导用户手动压缩、“max_turns” 建议调 maxTurns。


Task Layer:7 种 Task 类型

Agent 调用的顶层封装在 Task.ts。共有 7 种 TaskType

export type TaskType =
  | 'local_bash'           // 本地 shell 任务
  | 'local_agent'          // 本地运行的子 agent
  | 'remote_agent'         // CCR 云端 agent
  | 'in_process_teammate'  // 同进程内的 teammate
  | 'local_workflow'       // 本地 workflow
  | 'monitor_mcp'          // MCP server 监控
  | 'dream'                // 夜间整理记忆的 dream job

5 种状态:

export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed' | 'killed'

Task ID 有前缀TASK_ID_PREFIXES):

{
  local_bash: 'b',           // 保持 'b' 兼容旧版本
  local_agent: 'a',
  remote_agent: 'r',
  in_process_teammate: 't',
  local_workflow: 'w',
  monitor_mcp: 'm',
  dream: 'd',
}

随机 8 字符 suffix 从 36^8 ≈ 2.8 万亿组合——源码注释:“sufficient to resist brute-force symlink attacks”

为什么 ID 要抗 symlink 攻击:task 的输出文件路径来自 ID(getTaskOutputPath(id))。如果攻击者能预测 ID,就能预先创建 symlink 指向任意文件,让 task 的 stdout 写入该文件。36^8 的熵足够让这种攻击不现实。

给自研 agent 的启示任何用 ID 作为文件路径的系统必须考虑 ID 可预测性——生产事故里这种”ID 不够随机导致 race condition 或攻击”的案例不少见。


Resume:claude --resume 和 session storage

前面 compaction 提到过:“Background jobs that summarize previous conversations for the claude --resume feature”——resume 时的压缩是后台预算好的

机制拆解:

  1. Session storage:每次对话都写入 ~/.claude/projects/<project>/sessions/<sessionId>.jsonl
  2. 后台 summary agent:退出 Claude Code 后,一个后台 job 读取 session 文件,产出 summary
  3. Resume 时:读 session + summary,重建 State,进入 queryLoop

这样 resume 近乎即时——summary 已经算好。注释里明说 “subscribers can use /stats to view usage patterns”——usage 数据持久化了,跨 session 可见。


Abort 的全链路路径

toolUseContext.abortController 是贯穿整条调用链的中央取消点:

用户按 Esc
  → abortController.abort()
  → signal.aborted = true
  → LLM 流中断(AbortSignal 传到 fetch)
  → 所有在跑的工具收到 signal(tool.execute 的第二参数)
  → 各工具自己的 cleanup(bwrap 进程 kill、文件锁释放、...)
  → queryLoop 检查 signal.aborted → return { reason: 'aborted' }

关键设计:每一层都自己观察 signal,不用等上层通知。LLM fetch 原生支持 AbortSignal;工具 execute 的第二参数总含 signal;Bash 进程 wait 监听 signal——取消是全链路的广播,不是逐层转发

给自研 agent 的启示AbortController 必须从入口贯穿到每一个叶子操作。半截的 abort 支持比没有更糟——用户以为取消了,实际上某层还在跑。


QueryEngine:单次 LLM 调用的层级

QueryEngine.ts(1295 行)是 callModel 函数的底层,专门处理一次 LLM 调用的流式消费

  • 解析 SSE event(content_block_start / content_block_delta / content_block_stop / message_delta / …)
  • 构建 assistant message
  • 处理 max_tokens / stop_reason / 各种 API error
  • Streaming fallback(上面讲过)
  • Thinking block 的特殊处理(signature 验证)
  • Usage tracking(包括 cache read / cache creation 各自的 token 数)

这一层的复杂度来自需要把 API 的”low-level event 流”组装成”high-level assistant message”,同时保持 cancel-safe / error-safe / partial-state-safe。不是玩具——生产级的 streaming API 消费端逻辑至少是 1000+ 行。


给自研 agent 的要点

  1. 主循环用 async generator,不是 Promise——UI 实时流式消费是 agent 产品的基线体验
  2. Terminal 返回值要带 reason——不同终止原因引导不同后续 UX,黑盒 undefined 没法做
  3. 依赖注入:callModel / microcompact / autocompact 等高频 mock 的依赖做成 deps 对象——否则测试必须 spy 6-8 个模块
  4. State 字段清晰语义:14 个字段每个回答一个问题,远胜一个 any bucket
  5. 每轮处理是明确的流水线:snip → microcompact → collapse → autocompact → blocking check → model call → tools。顺序是设计选择,要有注释解释为什么这个顺序
  6. 并发预取:skill / memory / cache params 都可以在 LLM 流式期间后台跑——用 Promise.allusing disposable 管理生命周期
  7. Streaming tool executor:LLM 流到 tool_use block 就启动工具,不等整个 assistant message 完成——显著降低多工具场景延迟
  8. Tombstone message 作为 “已流出但作废” 的显式标记——streaming 输出无法客户端撤回,需要显式标记
  9. Model fallback + streaming fallback 是两层 —— 分别应对 API error 和 streaming 异常
  10. AbortController 贯穿到叶子——每一层自己观察 signal,不是层层转发。半截的 cancel 比没有更糟
  11. Task ID 要有足够熵——文件名用 ID 时必须考虑 symlink 攻击,36^8 是 Claude Code 的选择
  12. Resume 的压缩在后台预算——用户体验上 resume 是即时的,不是退出时没处理

延伸阅读

  • Claude Code 源码:query.tsQueryEngine.tsTask.tsquery/{deps,stopHooks,tokenBudget,config}.ts
  • 上下文压缩——每轮处理流水线的第 7-9 步在这里详细展开
  • 执行环境——Task 类型里 worktree / remote 的具体实现
  • 权限系统——canUseTool 的决策路径
这页有帮助吗?