UI 流编排

L3 UI 流两条路径对比 / createUIMessageStream 的 execute-driven 模式 / writer 三方法 / 自定义 data-* 事件协议 / transient 语义 / 错误捕获分层

为什么需要这一页

运行生命周期 里讲的 L3(UI 流层)只覆盖了 result.toUIMessageStream() ——纯 transform 路径,被动消费 agent 的 fullStream 产出 SSE 事件。

但 SDK 还有另一条 L3 路径createUIMessageStream({ execute }) ——execute-driven 路径,在 agent 流之外还能主动推送自定义事件、合并多条流、分阶段写入。典型使用场景:

  • 流开始时先推 initialization / config 事件,再开始 agent 流
  • 在 agent 流里穿插进度状态(“连接 MCP 中”、“执行中”、“生成中”)
  • 合并多个源(agent 流 + 后台任务结果 + 用户输入补充)
  • 分阶段 drain(先 drain 一个 agent 流,拿到结果再决定是否启动第二个)

两条路径都在 SDK 里,解决不同问题,API 形态也不同。本页把它们对照讲透,并在末尾附一个生产落地案例(Case study)。

两条 L3 路径对照

维度result.toUIMessageStream()createUIMessageStream({ execute })
定位agent 结果对象的一个方法独立顶层函数
发起方已经有 agentStream 之后调用调,在 execute 里再构造 agent
控制粒度被动 transform agent 的 fullStreamexecute 里主动调度:先写什么、后合并什么
能推自定义事件?不能可以:writer.write({ type: "data-xxx", ... })
能合并多个流?不能:单输入单输出可以:writer.merge(otherStream) 把多个流折进一个输出
回调onFinish / onStepFinish / onError同 + 一个 execute 异步函数

何时用哪个

  • 只需要把单一 agent 流发给前端 → result.toUIMessageStream()
  • 需要在 agent 流之外推进度事件、或合并多个流(比如先推配置事件、再 merge agent、最后推总结)→ createUIMessageStream({ execute })

createUIMessageStream 的完整签名

function createUIMessageStream<MESSAGE_METADATA>({
  execute,              // async ({ writer }) => { ... } — 核心
  originalMessages?,    // 用于 messageId 延续
  generateId?,          // assistant 消息 ID 生成器
  onStepFinish?,        // L3 step 级回调(见 lifecycle 页)
  onFinish?,            // L3 整流 drain 回调
  onError?,             // 错误序列化器
}): ReadableStream<UIMessageStreamPart>

execute 是主入口——SDK 调用 createUIMessageStream立即异步执行 execute,把 writer 传给你。你在 execute 里想推什么、想合并什么都可以。返回的 Promise resolve 后 SDK 会等所有 writer.merge(...) 推进来的流 drain 完再 controller.close()

toUIMessageStream 的参数差异createUIMessageStream 不接受 messageMetadata —— 该参数只存在于 result.toUIMessageStream({ messageMetadata })。要给 chunk 挂 part-level metadata,只能在 writer.merge(...) 里合并进来的那个内层 toUIMessageStream 上挂——这是 execute-driven 路径常见的”外层编排 + 内层 metadata”分层(具体落地见 Case study)。

writer API 的三个方法

writerUIMessageStreamWriter(源码 dist/index.js:8343-8368):

1. writer.write(part) —— 立即推一个 UI part

writer.write({
  type: "data-agent-state",
  data: { text: "Building agent", type: "progress" },
  transient: true,
});

part 的类型必须是 UIMessageStreamPart——包括 SDK 标准类型(text-starttext-deltatool-call、…)和 type: "data-${string}"自定义扩展事件。自定义事件是推自家 UI 状态的正门。

2. writer.merge(stream) —— 把另一个 ReadableStream 折进来

const agentResult = await runAgentStream(params);
writer.merge(agentResult.toUIMessageStream({ ... }));

这是 execute-driven 路径的核心拼装方式——外层流先推初始化 / 配置 / 状态类 data-* 事件,等 agent 构造完、拿到 agentStream 后再把它的 text-delta / tool-call / tool-result 通过 merge “折进” 同一条输出流。客户端只看到一条连续的 UI 流。

并发语义merge 内部把源流的 reader 读循环扔进 ongoingStreamPromises 数组,SDK 最后会 await 完所有 ongoing 流才关闭输出 controller。所以 execute 返回后,流不会立即关,会等所有被 merge 进来的子流 drain。

3. writer.onError —— 暴露给用户的错误处理器

就是你传给 createUIMessageStreamonError 的引用——方便你在 execute 内部直接调用它做错误序列化(很少用,通常靠 try/catch 或让 merge 自动走错误路径)。

自定义 data-* 事件协议

SDK 的 UI stream 协议允许 type: "data-${string}" 的任意事件——这是业务方扩展 UI 协议的官方正门。只要求两件事:

  1. type 必须以 data- 开头(协议约定的 discriminator 前缀)
  2. type 里的字符串必须是 kebab-case 可序列化形式(原因见常见陷阱 #4

其他字段(data 的 shape、transient 开关)完全由业务方定义。

典型的事件分类

不同项目定义不同的事件集,归纳起来无非三类:

类别用途是否 transient形态示例
生命周期信号运行的起点 / 关键状态切换持久化data-run-initdata-task-milestone
实时状态指示显示”正在做什么”,只看当下transientdata-progressdata-agent-state
结构化业务载荷功能性数据更新(列表、进度条、结果片段)视业务而定data-todos-updatedata-search-results

判断轴是”用户下周再打开这条历史,应该看到这个事件吗”——要就持久化,不要就 transient(完整规则见下一节)。

传输无关的 writer 抽象模式

生产环境里 agent 引擎常常需要跨传输复用——同一套 tool 调用日志要在 SSE 流(HTTP)、WebSocket 房间、Electron IPC 之间都能推。此时直接在 agent 代码里依赖 UIMessageStreamWriter 会把它和 SSE 绑死。

通用解法是在 UIMessageStreamWriter 之上再抽一层接口:

interface StreamWriter {
  write(event: {
    type: `data-${string}`;
    data: unknown;
  }): void;

  writeTransient(event: {
    type: `data-${string}`;
    data: unknown;
  }): void;
}

两个方法只差 transient 是否标 true——通常实现为一个方法加默认参数即可,分成两个是为了调用侧语义明确(context.writeTransient(...)context.write(..., true) 更易读、更难出错)。

然后针对每种传输各写一个实现:

  • SSE / HTTP:包装 UIMessageStreamWriter.write(...)
  • WebSocket:包装 wsRoom.broadcast(...)
  • Electron IPC:包装 webContents.send(...)

Agent 引擎只依赖 StreamWriter 接口,调 writer.write(...) / writer.writeTransient(...);具体走哪路传输由 composition root 注入。这样一套 agent 代码能复用于多个 transport。

一个具体的生产级实现见本页末尾 Case study

transient: true —— 最容易忽略的开关

SDK 处理自定义 data-* 事件时按 transient 分流:

  • transient: false(默认) —— 事件会被记录进 responseMessage 的 metadata parts,参与 onFinish 回调的 payload,也会被持久化到聊天历史(如果消费者存库)
  • transient: true —— 事件只在流中流过一次就扔掉,不写入 responseMessage、不出现在持久化历史

什么时候要 transient:任何”实时状态指示”类型的事件。举例:

// OK — progress 状态不该进聊天历史
writer.write({
  type: "data-progress",
  data: { stage: "connecting_mcp" },
  transient: true,
});

// OK — 运行开始事件需要留下(UI 回放要看到)
writer.write({
  type: "data-run-init",
  data: {},
  // transient: false(默认)
});

// WRONG — 忘了标 transient,每次刷新聊天都会重播
//         "connecting..." / "executing..." / "generating..." 的旧状态事件

规则UI 状态 → transient;业务事件 → 持久化。不确定时问:“用户下周再打开这条历史,应该看到这个事件吗?“——要就持久化,不要就 transient。

execute 函数的生命周期

createUIMessageStream({ execute, ... }) 被调用

  ├─ 内部新建 ReadableStream + controller

  ├─ 同步调用 execute({ writer })——注意外层有一层 try/catch
  │   ├─ execute 是 async fn:立即返回一个 Promise
  │   │   └─ SDK 把 result.catch(e → enqueue "error" chunk) 挂到 ongoingStreamPromises
  │   └─ execute 调用自身同步 throw(非 async fn,或 async fn 在 Promise 构造前就 throw,
  │       例如参数解构报错)→ 外层 try/catch 直接写一个 "error" chunk

  ├─ execute 内你可以:
  │   │  writer.write({ type: "data-run-init", data: {} })     ← 立即入队
  │   │  writer.write({ type: "data-progress", ..., transient: true })
  │   │  const agentStream = await runAgentStream(...)
  │   │  writer.merge(agentStream.toUIMessageStream({ ... }))
  │   │    └─ merge 内部 push 一个 (async () => reader loop)().catch(...) 到 ongoingStreamPromises
  │   └─ execute 返回 / 所有 await 的 Promise resolve

  ├─ handleUIMessageStreamFinish(innerStream) 同步 pipeThrough 两个 TransformStream
  │   ├─ 一个注入 messageId 到 "start" chunk、追踪 "abort" chunk
  │   └─ 一个在每个 "finish-step" chunk 上调 onStepFinish;在 flush()/cancel() 上调 onFinish

  ├─ 返回包装后的 ReadableStream —— createUIMessageStream 在此 return,execute 还在后台跑

  ──── 以下为异步阶段 ────

  ├─ ongoingStreamPromises 全部 drain 完(execute + 每个 merge 的源流)
  ├─ waitForStreams.finally() 触发 → inner controller.close()
  ├─ 期间每个 finish-step chunk 流过下游 transform → callOnStepFinish()
  └─ 下游 TransformStream flush() 触发 → callOnFinish()

三个反直觉点

  1. execute 不 block stream 返回——createUIMessageStream 返回 ReadableStream同步的,execute 在后台异步跑。调用方可以马上把这个 ReadableStream 接 Response 送走。
  2. handleUIMessageStreamFinish 是同步包装,不是收尾动作——它在 return 之前就把内部流 pipeThrough 成最终流。onFinish 本身是挂在下游 TransformStream 的 flush()/cancel() 回调里的,要等内部 controller.close() 触发 flush 才真正执行。
  3. “execute throw 漏掉”的分水岭是”是否 await”,不是”同步 / 异步”——只要 execute 是 async fn,它的 throw(无论位于首行还是某个 await 之后)都走 Promise rejection 路径,被 .catch 包成 error chunk;真正会漏的是 execute 内未 await 的子 Promise(详见下一节)。

错误捕获全貌

SDK 对流内异常的处理不是一张统一的错误处理表,而是分层设计——每一层有自己的兜底策略,互不重叠。记清这张表再去写 execute 会省很多调试时间:

异常来源捕获位置(源码)落地形式用户代码需要做什么
execute() 调用本身同步 throw(非 async fn / async fn 在 Promise 构造前 throw)index.js:8342-8384 外层 try/catch写 error chunk,调 onError(err) 取消息通常不用管
async execute 函数体 throw(含 await 后 throw)index.js:8370-8377 ongoingStreamPromises.push(result.catch(...))写 error chunk,调 onError(err)通常不用管
execute 内未 await 的子 Promise rejection —— SDK 看不见Node 级 unhandledrejection,流继续但再也不写入必须自己 try/catch 或显式 .catch
merge 进来的源流 reader 抛错index.js:8358-8363 .catch(...)写 error chunk,调 onError(err)通常不用管;但源流持有的外部资源(sandbox / DB)要自己加 abort
onStepFinish 抛错index.js:5949-5960 try/catch 后路由到 onError不写 error chunk,只触发 onError 一次想让前端感知错误需要在 onStepFinish 里自己写 error chunk
onFinish 抛错 —— index.js:5927-5943 裸 await错误冒到 TransformStream.flush() → 消费端 iterator reject必须自己 try/catch/finally 兜底——任何生产级 onFinish(持久化、释放锁、通知下游)都得裹一层
UI transform 内部(processUIMessageStream)异常handleUIMessageStreamFinishonError 透传给 processUIMessageStreamonError(err)通常不用管

onError 的返回值契约:它不是纯 logger——返回的 string 会被写入 error chunk 的 errorText 字段发给客户端。常见模式是在 onError 里维护一个”首次错误”变量,只记录 / 返回第一次的消息——同一次运行的一个 aborted Promise 可能会分别从 execute 和某个被 merge 的源流各触发一次 onError,不做去重会得到重复的错误日志(具体实现见 Case study)。

一句话总结execute 的 throw 基本 SDK 兜;onStepFinish 的 throw 变成一次 onError 调用;onFinish 的 throw 和 execute 内未 await 的 Promise 两类必须自己兜

Case study

本节是一个生产落地案例(Zapvol 的 task-orchestrator),把前面讲的 execute-driven 路径、writer 三方法、自定义 data-* 事件、transient 语义、双层 handleUIMessageStreamFinish 串起来看。不是必须的使用模式——你的项目可以选择单层 createUIMessageStream、或者直接用 result.toUIMessageStreamResponse();这里只是一种走通了的走法

整体形态

// apps/server/src/services/task-orchestrator.ts
return createUIMessageStream<ZapvolMessage>({
  originalMessages: uiMessages,
  generateId: () => assistantMessageId,

  execute: async ({ writer }) => {
    // 1. 构造业务上下文,把 writer 透传下去(包成 StreamWriter 抽象)
    const context = zapvolContextSchema.parse({
      taskId, userId, sandbox, writer, browserBridge,
    });

    // 2. 立即推 run-init(持久化——UI 回放要看到)
    context.write(DataPartEvent.AGENT_INIT, {});

    // 3. 一系列 transient 状态事件标记 agent 构建过程
    context.writeTransient(DataPartEvent.AGENT_STATE, {
      text: AgentStateKey.AGENT_BUILDING,
    });
    const { agentConfig, subagentDefs } = await resolveAgentSetup(...);

    context.writeTransient(DataPartEvent.AGENT_STATE, {
      text: AgentStateKey.MCP_CONNECTING,
    });
    // ... MCP 连接 ...

    // 4. 最终构造 agent 并把它的 UI 流 merge 进来
    const agentResult = await runAgentStream(...);
    writer.merge(toUIMessageStream(agentResult.agentStream, {
      context, modelId, assistantMessageId, createdAt, messages: uiMessages,
    }));

    // 5. execute 在这里 return——但 SDK 会等 merge 进来的 agent 流 drain 完
    //    才关闭 controller 并触发 onFinish
  },

  onFinish: async ({ responseMessage, messages, isAborted }) => {
    try {
      await taskService.saveAssistantMessage(responseMessage);
    } catch (error) {
      log.error("assistant.persist_failed", { error });
    } finally {
      // 释放锁、清理 MCP 连接等——失败也要跑
      abortManager.remove(taskId);
      await releaseTaskLock(taskId);
    }
  },

  onError: (error) => {
    // session.error 本质是"execute 外层作用域内的首次错误变量"
    // 在自己的项目里用 `let firstError: string | undefined` 闭包捕获即可
    const message = error instanceof Error ? error.message : "Unknown error";
    if (!session.error) {
      log.error("stream.error", { error: message });
      session.error = message;   // 去重:只记第一次
    }
    return session.error;
  },
});

这段代码的流时序

  1. data-run-init(持久化)
  2. 三四个 data-agent-state(transient,UI 显示”Building…” / “Connecting…” / …)
  3. agent 的所有 text-delta / tool-call / tool-result 通过 merge 进来
  4. 最后一个 data-agent-state: "completed"(transient)
  5. SDK 自动发 finish chunk
  6. onFinish 触发,持久化 responseMessage

这个案例体现的 4 个通用模式

  1. 外层 data- 初始化 + 内层 agent merge* —— execute 先推 run-init / progress,再 merge agent 流,客户端看到一条连续流。
  2. transient 用于 UI 状态、非 transient 用于业务事件 —— “connecting…” 是 transient,“run-init” 是持久化,反过来就错了。
  3. onFinish 必裹 try/catch/finally —— SDK 不兜它的 throw(见”错误捕获全貌”),所以持久化 / 释放锁 / 下游通知都要自兜。
  4. onError 做首次错误去重 —— 同一次运行可能被 SDK 多次调 onError(execute + 某个 merge 源流),首次记录后续返回相同 string,避免重复日志 / 重复错误 chunk。

容易被忽略的结构:两层嵌套的 handleUIMessageStreamFinish

这是任何createUIMessageStream + 内部再 merge result.toUIMessageStream(...)”的 pipeline 都会遇到的结构——不仅仅是本案例。两条路径都共用 handleUIMessageStreamFinishindex.js:8100 / :8397),嵌起来就是两层。

凡是 execute 里再 writer.merge(agentStream.toUIMessageStream(...)) 的 pipeline,都会有两层 handleUIMessageStreamFinish —— 内层来自 toUIMessageStream(agentStream),外层来自 createUIMessageStream

agentStream
  → (业务薄封装) toUIMessageStream(agentStream, opts)
    → SDK agentStream.toUIMessageStream({ messageMetadata, onFinish, onError })
      ↳ handleUIMessageStreamFinish  ← 内层:为 agent 结果的 UI 转换兜底
  → writer.merge(...)
  → createUIMessageStream 内部 controller 入队
    ↳ handleUIMessageStreamFinish  ← 外层:为 execute-driven 流整体兜底

意味着同一次运行里,onFinish / onStepFinish / onError 都会被调两次——一次内层、一次外层。清晰的分工模板:

onFinishonErrormessageMetadata
内层 toUIMessageStream(agentStream)no-op(留空,持久化交给外层)返回 error.message 给 SDK 作为 error chunk 内容使用——给每个 part 挂 modelId / firstTokenAt / totalUsage 等
外层 createUIMessageStream({ execute })持久化 responseMessage、通知下游、释放锁去重(只记首次)、log、返回给 SDK不接受此参数(见下)

messageMetadata 只能挂内层createUIMessageStream({ execute }) 的签名(index.js:8321-8328不接受 messageMetadata 参数——只有 result.toUIMessageStream({ messageMetadata }) 接。所以要给 chunk 挂 part-level metadata,只能在 writer.merge(...) 里合并进来的那个内层 toUIMessageStream 上挂;直接在外层 createUIMessageStream 传是无效的。

常见陷阱

1. execute 内未 await 的子 Promise 会静默——不是 execute 本身

execute 自己返回的 Promise 是被 SDK 追踪的(见”错误捕获全貌”)。真正会漏的是 execute 内部发起但没 await 的子 Promise:

execute: async ({ writer }) => {
  someAsyncFn();  // WRONG — 没 await,rejection 走 Node unhandledrejection,
                  //         SDK 看不到,writer 继续"正常"但下游在等永远不会来的数据
},

:所有异步逻辑都 await,或显式 .catch(e => writer.write({ type: "error", errorText: ... }))

2. transient 事件不会出现在 onFinish 的 messages 里

context.writeTransient(...) 发的事件不会进 responseMessage.metadata.parts。如果你在 onFinish 里想读取这些事件做统计——读不到,它们已经被丢弃了。需要做持久化统计就发非 transient 事件,或者走旁路 Redis / DB 记录。

3. writer.merge 的源流一旦开始消费就不能停

merge 进来的流由 SDK 的 reader 持续读取直到 done 为止,中途取消 createUIMessageStream 外层流不会反向取消被 merge 进来的源流——那个源流会继续跑直到自己结束。如果源流持有资源(sandbox、DB 连接),要在源头加自己的 abort 机制。

4. data-${string} 的字符串不能包含特殊字符

SDK 直接把 type 字段发给前端当 discriminator。保持 kebab-case(data-agent-state,不是 data-AgentStatedata-agent.state)避免前端 switch 匹配失败。

延伸阅读

相关 SDK 章节

  • 运行生命周期 —— L3 层的同名回调(onFinish / onStepFinish / onError)在这里详述
  • 消息引用模型 —— agent 内部的消息链(和 L3 外层流不要搞混)

SDK 源码锚点ai@6.0.134

  • dist/index.js:8321-8405 —— createUIMessageStream 实现
  • dist/index.js:5883-5983 —— handleUIMessageStreamFinish 实现
  • dist/index.js:7839-8108 —— toUIMessageStream 实现(两条路径共用 handleUIMessageStreamFinish

Zapvol 落地参考(Case study)

  • Task Orchestration —— 整套如何串起来
  • packages/backend/src/agent/context.ts —— StreamWriter 接口定义
  • apps/server/src/services/task-orchestrator.ts —— execute-driven 路径的生产实现
这页有帮助吗?