UI 流编排
L3 UI 流两条路径对比 / createUIMessageStream 的 execute-driven 模式 / writer 三方法 / 自定义 data-* 事件协议 / transient 语义 / 错误捕获分层
为什么需要这一页
运行生命周期 里讲的 L3(UI 流层)只覆盖了 result.toUIMessageStream() ——纯 transform 路径,被动消费 agent 的 fullStream 产出 SSE 事件。
但 SDK 还有另一条 L3 路径:createUIMessageStream({ execute }) ——execute-driven 路径,在 agent 流之外还能主动推送自定义事件、合并多条流、分阶段写入。典型使用场景:
- 流开始时先推 initialization / config 事件,再开始 agent 流
- 在 agent 流里穿插进度状态(“连接 MCP 中”、“执行中”、“生成中”)
- 合并多个源(agent 流 + 后台任务结果 + 用户输入补充)
- 分阶段 drain(先 drain 一个 agent 流,拿到结果再决定是否启动第二个)
两条路径都在 SDK 里,解决不同问题,API 形态也不同。本页把它们对照讲透,并在末尾附一个生产落地案例(Case study)。
两条 L3 路径对照
| 维度 | result.toUIMessageStream() | createUIMessageStream({ execute }) |
|---|---|---|
| 定位 | agent 结果对象的一个方法 | 独立顶层函数 |
| 发起方 | 已经有 agentStream 之后调用 | 先调,在 execute 里再构造 agent |
| 控制粒度 | 被动 transform agent 的 fullStream | execute 里主动调度:先写什么、后合并什么 |
| 能推自定义事件? | 不能 | 可以:writer.write({ type: "data-xxx", ... }) |
| 能合并多个流? | 不能:单输入单输出 | 可以:writer.merge(otherStream) 把多个流折进一个输出 |
| 回调 | onFinish / onStepFinish / onError | 同 + 一个 execute 异步函数 |
何时用哪个:
- 只需要把单一 agent 流发给前端 →
result.toUIMessageStream() - 需要在 agent 流之外推进度事件、或合并多个流(比如先推配置事件、再 merge agent、最后推总结)→
createUIMessageStream({ execute })
createUIMessageStream 的完整签名
function createUIMessageStream<MESSAGE_METADATA>({
execute, // async ({ writer }) => { ... } — 核心
originalMessages?, // 用于 messageId 延续
generateId?, // assistant 消息 ID 生成器
onStepFinish?, // L3 step 级回调(见 lifecycle 页)
onFinish?, // L3 整流 drain 回调
onError?, // 错误序列化器
}): ReadableStream<UIMessageStreamPart>
execute 是主入口——SDK 调用 createUIMessageStream 时立即异步执行 execute,把 writer 传给你。你在 execute 里想推什么、想合并什么都可以。返回的 Promise resolve 后 SDK 会等所有 writer.merge(...) 推进来的流 drain 完再 controller.close()。
和 toUIMessageStream 的参数差异:createUIMessageStream 不接受 messageMetadata —— 该参数只存在于 result.toUIMessageStream({ messageMetadata })。要给 chunk 挂 part-level metadata,只能在 writer.merge(...) 里合并进来的那个内层 toUIMessageStream 上挂——这是 execute-driven 路径常见的”外层编排 + 内层 metadata”分层(具体落地见 Case study)。
writer API 的三个方法
writer 是 UIMessageStreamWriter(源码 dist/index.js:8343-8368):
1. writer.write(part) —— 立即推一个 UI part
writer.write({
type: "data-agent-state",
data: { text: "Building agent", type: "progress" },
transient: true,
});
part 的类型必须是 UIMessageStreamPart——包括 SDK 标准类型(text-start、text-delta、tool-call、…)和 type: "data-${string}" 的自定义扩展事件。自定义事件是推自家 UI 状态的正门。
2. writer.merge(stream) —— 把另一个 ReadableStream 折进来
const agentResult = await runAgentStream(params);
writer.merge(agentResult.toUIMessageStream({ ... }));
这是 execute-driven 路径的核心拼装方式——外层流先推初始化 / 配置 / 状态类 data-* 事件,等 agent 构造完、拿到 agentStream 后再把它的 text-delta / tool-call / tool-result 通过 merge “折进” 同一条输出流。客户端只看到一条连续的 UI 流。
并发语义:merge 内部把源流的 reader 读循环扔进 ongoingStreamPromises 数组,SDK 最后会 await 完所有 ongoing 流才关闭输出 controller。所以 execute 返回后,流不会立即关,会等所有被 merge 进来的子流 drain。
3. writer.onError —— 暴露给用户的错误处理器
就是你传给 createUIMessageStream 的 onError 的引用——方便你在 execute 内部直接调用它做错误序列化(很少用,通常靠 try/catch 或让 merge 自动走错误路径)。
自定义 data-* 事件协议
SDK 的 UI stream 协议允许 type: "data-${string}" 的任意事件——这是业务方扩展 UI 协议的官方正门。只要求两件事:
type必须以data-开头(协议约定的 discriminator 前缀)type里的字符串必须是 kebab-case 可序列化形式(原因见常见陷阱 #4)
其他字段(data 的 shape、transient 开关)完全由业务方定义。
典型的事件分类
不同项目定义不同的事件集,归纳起来无非三类:
| 类别 | 用途 | 是否 transient | 形态示例 |
|---|---|---|---|
| 生命周期信号 | 运行的起点 / 关键状态切换 | 持久化 | data-run-init、data-task-milestone |
| 实时状态指示 | 显示”正在做什么”,只看当下 | transient | data-progress、data-agent-state |
| 结构化业务载荷 | 功能性数据更新(列表、进度条、结果片段) | 视业务而定 | data-todos-update、data-search-results |
判断轴是”用户下周再打开这条历史,应该看到这个事件吗”——要就持久化,不要就 transient(完整规则见下一节)。
传输无关的 writer 抽象模式
生产环境里 agent 引擎常常需要跨传输复用——同一套 tool 调用日志要在 SSE 流(HTTP)、WebSocket 房间、Electron IPC 之间都能推。此时直接在 agent 代码里依赖 UIMessageStreamWriter 会把它和 SSE 绑死。
通用解法是在 UIMessageStreamWriter 之上再抽一层接口:
interface StreamWriter {
write(event: {
type: `data-${string}`;
data: unknown;
}): void;
writeTransient(event: {
type: `data-${string}`;
data: unknown;
}): void;
}
两个方法只差 transient 是否标 true——通常实现为一个方法加默认参数即可,分成两个是为了调用侧语义明确(context.writeTransient(...) 比 context.write(..., true) 更易读、更难出错)。
然后针对每种传输各写一个实现:
- SSE / HTTP:包装
UIMessageStreamWriter.write(...) - WebSocket:包装
wsRoom.broadcast(...) - Electron IPC:包装
webContents.send(...)
Agent 引擎只依赖 StreamWriter 接口,调 writer.write(...) / writer.writeTransient(...);具体走哪路传输由 composition root 注入。这样一套 agent 代码能复用于多个 transport。
一个具体的生产级实现见本页末尾 Case study。
transient: true —— 最容易忽略的开关
SDK 处理自定义 data-* 事件时按 transient 分流:
transient: false(默认) —— 事件会被记录进 responseMessage 的 metadata parts,参与onFinish回调的 payload,也会被持久化到聊天历史(如果消费者存库)transient: true—— 事件只在流中流过一次就扔掉,不写入 responseMessage、不出现在持久化历史
什么时候要 transient:任何”实时状态指示”类型的事件。举例:
// OK — progress 状态不该进聊天历史
writer.write({
type: "data-progress",
data: { stage: "connecting_mcp" },
transient: true,
});
// OK — 运行开始事件需要留下(UI 回放要看到)
writer.write({
type: "data-run-init",
data: {},
// transient: false(默认)
});
// WRONG — 忘了标 transient,每次刷新聊天都会重播
// "connecting..." / "executing..." / "generating..." 的旧状态事件
规则:UI 状态 → transient;业务事件 → 持久化。不确定时问:“用户下周再打开这条历史,应该看到这个事件吗?“——要就持久化,不要就 transient。
execute 函数的生命周期
createUIMessageStream({ execute, ... }) 被调用
│
├─ 内部新建 ReadableStream + controller
│
├─ 同步调用 execute({ writer })——注意外层有一层 try/catch
│ ├─ execute 是 async fn:立即返回一个 Promise
│ │ └─ SDK 把 result.catch(e → enqueue "error" chunk) 挂到 ongoingStreamPromises
│ └─ execute 调用自身同步 throw(非 async fn,或 async fn 在 Promise 构造前就 throw,
│ 例如参数解构报错)→ 外层 try/catch 直接写一个 "error" chunk
│
├─ execute 内你可以:
│ │ writer.write({ type: "data-run-init", data: {} }) ← 立即入队
│ │ writer.write({ type: "data-progress", ..., transient: true })
│ │ const agentStream = await runAgentStream(...)
│ │ writer.merge(agentStream.toUIMessageStream({ ... }))
│ │ └─ merge 内部 push 一个 (async () => reader loop)().catch(...) 到 ongoingStreamPromises
│ └─ execute 返回 / 所有 await 的 Promise resolve
│
├─ handleUIMessageStreamFinish(innerStream) 同步 pipeThrough 两个 TransformStream
│ ├─ 一个注入 messageId 到 "start" chunk、追踪 "abort" chunk
│ └─ 一个在每个 "finish-step" chunk 上调 onStepFinish;在 flush()/cancel() 上调 onFinish
│
├─ 返回包装后的 ReadableStream —— createUIMessageStream 在此 return,execute 还在后台跑
│
──── 以下为异步阶段 ────
│
├─ ongoingStreamPromises 全部 drain 完(execute + 每个 merge 的源流)
├─ waitForStreams.finally() 触发 → inner controller.close()
├─ 期间每个 finish-step chunk 流过下游 transform → callOnStepFinish()
└─ 下游 TransformStream flush() 触发 → callOnFinish()
三个反直觉点:
- execute 不 block stream 返回——
createUIMessageStream返回ReadableStream是同步的,execute 在后台异步跑。调用方可以马上把这个 ReadableStream 接 Response 送走。 handleUIMessageStreamFinish是同步包装,不是收尾动作——它在 return 之前就把内部流 pipeThrough 成最终流。onFinish本身是挂在下游 TransformStream 的flush()/cancel()回调里的,要等内部 controller.close() 触发 flush 才真正执行。- “execute throw 漏掉”的分水岭是”是否 await”,不是”同步 / 异步”——只要 execute 是 async fn,它的 throw(无论位于首行还是某个 await 之后)都走 Promise rejection 路径,被
.catch包成 error chunk;真正会漏的是 execute 内未 await 的子 Promise(详见下一节)。
错误捕获全貌
SDK 对流内异常的处理不是一张统一的错误处理表,而是分层设计——每一层有自己的兜底策略,互不重叠。记清这张表再去写 execute 会省很多调试时间:
| 异常来源 | 捕获位置(源码) | 落地形式 | 用户代码需要做什么 |
|---|---|---|---|
| execute() 调用本身同步 throw(非 async fn / async fn 在 Promise 构造前 throw) | index.js:8342-8384 外层 try/catch | 写 error chunk,调 onError(err) 取消息 | 通常不用管 |
| async execute 函数体 throw(含 await 后 throw) | index.js:8370-8377 ongoingStreamPromises.push(result.catch(...)) | 写 error chunk,调 onError(err) | 通常不用管 |
| execute 内未 await 的子 Promise rejection | 无 —— SDK 看不见 | Node 级 unhandledrejection,流继续但再也不写入 | 必须自己 try/catch 或显式 .catch |
| merge 进来的源流 reader 抛错 | index.js:8358-8363 .catch(...) | 写 error chunk,调 onError(err) | 通常不用管;但源流持有的外部资源(sandbox / DB)要自己加 abort |
onStepFinish 抛错 | index.js:5949-5960 try/catch 后路由到 onError | 不写 error chunk,只触发 onError 一次 | 想让前端感知错误需要在 onStepFinish 里自己写 error chunk |
onFinish 抛错 | 无 —— index.js:5927-5943 裸 await | 错误冒到 TransformStream.flush() → 消费端 iterator reject | 必须自己 try/catch/finally 兜底——任何生产级 onFinish(持久化、释放锁、通知下游)都得裹一层 |
UI transform 内部(processUIMessageStream)异常 | handleUIMessageStreamFinish 将 onError 透传给 processUIMessageStream | 调 onError(err) | 通常不用管 |
onError 的返回值契约:它不是纯 logger——返回的 string 会被写入 error chunk 的 errorText 字段发给客户端。常见模式是在 onError 里维护一个”首次错误”变量,只记录 / 返回第一次的消息——同一次运行的一个 aborted Promise 可能会分别从 execute 和某个被 merge 的源流各触发一次 onError,不做去重会得到重复的错误日志(具体实现见 Case study)。
一句话总结:execute 的 throw 基本 SDK 兜;onStepFinish 的 throw 变成一次 onError 调用;onFinish 的 throw 和 execute 内未 await 的 Promise 两类必须自己兜。
Case study
本节是一个生产落地案例(Zapvol 的 task-orchestrator),把前面讲的 execute-driven 路径、writer 三方法、自定义 data-* 事件、transient 语义、双层 handleUIMessageStreamFinish 串起来看。不是必须的使用模式——你的项目可以选择单层
createUIMessageStream、或者直接用result.toUIMessageStreamResponse();这里只是一种走通了的走法。
整体形态
// apps/server/src/services/task-orchestrator.ts
return createUIMessageStream<ZapvolMessage>({
originalMessages: uiMessages,
generateId: () => assistantMessageId,
execute: async ({ writer }) => {
// 1. 构造业务上下文,把 writer 透传下去(包成 StreamWriter 抽象)
const context = zapvolContextSchema.parse({
taskId, userId, sandbox, writer, browserBridge,
});
// 2. 立即推 run-init(持久化——UI 回放要看到)
context.write(DataPartEvent.AGENT_INIT, {});
// 3. 一系列 transient 状态事件标记 agent 构建过程
context.writeTransient(DataPartEvent.AGENT_STATE, {
text: AgentStateKey.AGENT_BUILDING,
});
const { agentConfig, subagentDefs } = await resolveAgentSetup(...);
context.writeTransient(DataPartEvent.AGENT_STATE, {
text: AgentStateKey.MCP_CONNECTING,
});
// ... MCP 连接 ...
// 4. 最终构造 agent 并把它的 UI 流 merge 进来
const agentResult = await runAgentStream(...);
writer.merge(toUIMessageStream(agentResult.agentStream, {
context, modelId, assistantMessageId, createdAt, messages: uiMessages,
}));
// 5. execute 在这里 return——但 SDK 会等 merge 进来的 agent 流 drain 完
// 才关闭 controller 并触发 onFinish
},
onFinish: async ({ responseMessage, messages, isAborted }) => {
try {
await taskService.saveAssistantMessage(responseMessage);
} catch (error) {
log.error("assistant.persist_failed", { error });
} finally {
// 释放锁、清理 MCP 连接等——失败也要跑
abortManager.remove(taskId);
await releaseTaskLock(taskId);
}
},
onError: (error) => {
// session.error 本质是"execute 外层作用域内的首次错误变量"
// 在自己的项目里用 `let firstError: string | undefined` 闭包捕获即可
const message = error instanceof Error ? error.message : "Unknown error";
if (!session.error) {
log.error("stream.error", { error: message });
session.error = message; // 去重:只记第一次
}
return session.error;
},
});
这段代码的流时序:
data-run-init(持久化)- 三四个
data-agent-state(transient,UI 显示”Building…” / “Connecting…” / …) - agent 的所有 text-delta / tool-call / tool-result 通过 merge 进来
- 最后一个
data-agent-state: "completed"(transient) - SDK 自动发
finishchunk - onFinish 触发,持久化 responseMessage
这个案例体现的 4 个通用模式
- 外层 data- 初始化 + 内层 agent merge* —— execute 先推 run-init / progress,再 merge agent 流,客户端看到一条连续流。
- transient 用于 UI 状态、非 transient 用于业务事件 —— “connecting…” 是 transient,“run-init” 是持久化,反过来就错了。
- onFinish 必裹 try/catch/finally —— SDK 不兜它的 throw(见”错误捕获全貌”),所以持久化 / 释放锁 / 下游通知都要自兜。
- onError 做首次错误去重 —— 同一次运行可能被 SDK 多次调 onError(execute + 某个 merge 源流),首次记录后续返回相同 string,避免重复日志 / 重复错误 chunk。
容易被忽略的结构:两层嵌套的 handleUIMessageStreamFinish
这是任何”
createUIMessageStream+ 内部再 mergeresult.toUIMessageStream(...)”的 pipeline 都会遇到的结构——不仅仅是本案例。两条路径都共用handleUIMessageStreamFinish(index.js:8100/:8397),嵌起来就是两层。
凡是 execute 里再 writer.merge(agentStream.toUIMessageStream(...)) 的 pipeline,都会有两层 handleUIMessageStreamFinish —— 内层来自 toUIMessageStream(agentStream),外层来自 createUIMessageStream:
agentStream
→ (业务薄封装) toUIMessageStream(agentStream, opts)
→ SDK agentStream.toUIMessageStream({ messageMetadata, onFinish, onError })
↳ handleUIMessageStreamFinish ← 内层:为 agent 结果的 UI 转换兜底
→ writer.merge(...)
→ createUIMessageStream 内部 controller 入队
↳ handleUIMessageStreamFinish ← 外层:为 execute-driven 流整体兜底
意味着同一次运行里,onFinish / onStepFinish / onError 都会被调两次——一次内层、一次外层。清晰的分工模板:
| 层 | onFinish | onError | messageMetadata |
|---|---|---|---|
内层 toUIMessageStream(agentStream) | no-op(留空,持久化交给外层) | 返回 error.message 给 SDK 作为 error chunk 内容 | 使用——给每个 part 挂 modelId / firstTokenAt / totalUsage 等 |
外层 createUIMessageStream({ execute }) | 持久化 responseMessage、通知下游、释放锁 | 去重(只记首次)、log、返回给 SDK | 不接受此参数(见下) |
messageMetadata 只能挂内层:createUIMessageStream({ execute }) 的签名(index.js:8321-8328)不接受 messageMetadata 参数——只有 result.toUIMessageStream({ messageMetadata }) 接。所以要给 chunk 挂 part-level metadata,只能在 writer.merge(...) 里合并进来的那个内层 toUIMessageStream 上挂;直接在外层 createUIMessageStream 传是无效的。
常见陷阱
1. execute 内未 await 的子 Promise 会静默——不是 execute 本身
execute 自己返回的 Promise 是被 SDK 追踪的(见”错误捕获全貌”)。真正会漏的是 execute 内部发起但没 await 的子 Promise:
execute: async ({ writer }) => {
someAsyncFn(); // WRONG — 没 await,rejection 走 Node unhandledrejection,
// SDK 看不到,writer 继续"正常"但下游在等永远不会来的数据
},
修:所有异步逻辑都 await,或显式 .catch(e => writer.write({ type: "error", errorText: ... }))。
2. transient 事件不会出现在 onFinish 的 messages 里
context.writeTransient(...) 发的事件不会进 responseMessage.metadata.parts。如果你在 onFinish 里想读取这些事件做统计——读不到,它们已经被丢弃了。需要做持久化统计就发非 transient 事件,或者走旁路 Redis / DB 记录。
3. writer.merge 的源流一旦开始消费就不能停
merge 进来的流由 SDK 的 reader 持续读取直到 done 为止,中途取消 createUIMessageStream 外层流不会反向取消被 merge 进来的源流——那个源流会继续跑直到自己结束。如果源流持有资源(sandbox、DB 连接),要在源头加自己的 abort 机制。
4. data-${string} 的字符串不能包含特殊字符
SDK 直接把 type 字段发给前端当 discriminator。保持 kebab-case(data-agent-state,不是 data-AgentState 或 data-agent.state)避免前端 switch 匹配失败。
延伸阅读
相关 SDK 章节
SDK 源码锚点(ai@6.0.134)
dist/index.js:8321-8405——createUIMessageStream实现dist/index.js:5883-5983——handleUIMessageStreamFinish实现dist/index.js:7839-8108——toUIMessageStream实现(两条路径共用handleUIMessageStreamFinish)
Zapvol 落地参考(Case study)
- Task Orchestration —— 整套如何串起来
packages/backend/src/agent/context.ts——StreamWriter接口定义apps/server/src/services/task-orchestrator.ts—— execute-driven 路径的生产实现