UI Stream Orchestration
L3 UI-stream — the two paths contrasted / createUIMessageStream's execute-driven mode / three writer methods / custom data-* event protocol / transient semantics / error capture tiering
Why this page exists
The L3 layer (UI stream) in Runtime Lifecycle only covered result.toUIMessageStream()
— the pure-transform path that passively consumes an agent’s fullStream and emits SSE events.
But the SDK has another L3 path: createUIMessageStream({ execute }) — the execute-driven path, which actively
pushes custom events, merges multiple streams, and writes in phases alongside the agent stream. Typical use cases:
- Push initialization / config events first, then start the agent stream
- Interleave progress states (“connecting MCP”, “executing”, “generating”) through an agent run
- Merge multiple sources (agent stream + background job results + supplementary user input)
- Staged drain (drain one agent stream, decide whether to start a second based on the result)
Both paths exist in the AI SDK, solve different problems, and have different shapes. This page contrasts them, then grounds them in a production Case study at the end.
The two L3 paths
| Dimension | result.toUIMessageStream() | createUIMessageStream({ execute }) |
|---|---|---|
| Position | A method on the agent-result object | A standalone top-level function |
| Invocation | Called after you have an agentStream | Called first, agent is built inside execute |
| Control granularity | Passive transform of the agent’s fullStream | Active scheduling inside execute: write first, merge later |
| Can push custom events? | No | Yes: writer.write({ type: "data-xxx", ... }) |
| Can merge multiple streams? | No: one input, one output | Yes: writer.merge(otherStream) folds streams into one output |
| Callbacks | onFinish / onStepFinish / onError | Same + an execute async function |
When to use which:
- Only need to forward a single agent stream to the frontend →
result.toUIMessageStream() - Need to push progress events alongside the agent stream, or merge multiple streams (e.g. config event first, then
merge agent, then a summary event) →
createUIMessageStream({ execute })
createUIMessageStream full signature
function createUIMessageStream<MESSAGE_METADATA>({
execute, // async ({ writer }) => { ... } — the main entry
originalMessages?, // for messageId continuation
generateId?, // assistant message ID generator
onStepFinish?, // L3 step-level callback (see Lifecycle page)
onFinish?, // L3 full-stream drain callback
onError?, // error serializer
}): ReadableStream<UIMessageStreamPart>
execute is the main entry point — SDK invokes it immediately and asynchronously when you call
createUIMessageStream, passing in a writer. You can push whatever you want, merge whatever you want. After your
returned Promise resolves, the SDK waits for every writer.merge(...) stream to drain, then closes the controller.
Parameter difference vs toUIMessageStream: createUIMessageStream does not accept messageMetadata — that
parameter only exists on result.toUIMessageStream({ messageMetadata }). To attach part-level metadata to chunks,
do it on the inner toUIMessageStream that is writer.merge’d in — the classic “outer orchestration + inner metadata”
layering (see Case study at the end).
writer API — three methods
writer is a UIMessageStreamWriter (source: dist/index.js:8343-8368):
1. writer.write(part) — enqueue a UI part immediately
writer.write({
type: "data-agent-state",
data: { text: "Building agent", type: "progress" },
transient: true,
});
part must be a UIMessageStreamPart — this includes SDK standard types (text-start, text-delta, tool-call,
…) AND custom-extension events with type: "data-${string}". Custom events are the front door for your own UI
state.
2. writer.merge(stream) — fold another ReadableStream in
const agentResult = await runAgentStream(params);
writer.merge(agentResult.toUIMessageStream({ ... }));
This is the core assembly pattern of the execute-driven path — the outer stream pushes initialization / config /
state data-* events first; once the agent is built and agentStream is in hand, its text-delta / tool-call /
tool-result firehose is folded back into the same output via merge. The client sees a single continuous UI stream.
Concurrency semantics: merge internally throws the source stream’s reader loop into an
ongoingStreamPromises array; the SDK waits for every ongoing stream before closing the output controller. So after
execute returns, the stream does not close immediately — it waits for every merged sub-stream to drain.
3. writer.onError — the user-provided error serializer
Just a reference to the onError you passed to createUIMessageStream, exposed for convenience if you want to invoke
it directly inside execute for serialization (rarely used; usually you rely on try/catch or let merge handle error
pass-through).
Custom data-* event protocol
The SDK’s UI stream protocol permits arbitrary type: "data-${string}" events — this is the official front door
for business-layer extensions to the UI protocol. Only two things are required:
typemust start withdata-(the protocol-mandated discriminator prefix)- The string in
typemust be kebab-case serializable (see Trap #4)
Everything else (data shape, the transient flag) is entirely up to the business layer.
Typical event categories
Different projects define different event sets, but they boil down to three categories:
| Category | Purpose | Transient? | Example shapes |
|---|---|---|---|
| Lifecycle signals | Run start / key state transitions | Persisted | data-run-init, data-task-milestone |
| Real-time status indicators | ”What’s happening right now”, only cares about the present | transient | data-progress, data-agent-state |
| Structured business payloads | Functional data updates (lists, progress bars, result snippets) | Depends | data-todos-update, data-search-results |
The judgment axis is “should the user, reopening this history next week, see this event?” — yes means persist, no means transient (full rule in the next section).
The transport-agnostic writer abstraction pattern
In production, an agent engine often needs cross-transport reuse — the same tool-call log should be pushable via
SSE (HTTP), WebSocket rooms, or Electron IPC. Depending directly on UIMessageStreamWriter in agent code locks it
to SSE.
The general solution is to wrap one more interface on top of UIMessageStreamWriter:
interface StreamWriter {
write(event: {
type: `data-${string}`;
data: unknown;
}): void;
writeTransient(event: {
type: `data-${string}`;
data: unknown;
}): void;
}
The two methods differ only in whether transient is set to true — usually implemented as one method with a default
parameter. Splitting into two methods makes the calling site semantics explicit (context.writeTransient(...) is
easier to read and harder to misuse than context.write(..., true)).
Then write one implementation per transport:
- SSE / HTTP: wraps
UIMessageStreamWriter.write(...) - WebSocket: wraps
wsRoom.broadcast(...) - Electron IPC: wraps
webContents.send(...)
The agent engine only depends on the StreamWriter interface, calling writer.write(...) / writer.writeTransient(...);
the composition root decides which transport is actually wired in. One agent codebase can serve multiple transports.
A concrete production implementation is at the end of this page in Case study.
transient: true — the most-missed flag
When processing custom data-* events, the SDK routes by transient:
transient: false(default) — the event is recorded intoresponseMessage.metadataparts, appears inonFinish’s payload, and is persisted to chat history (if the consumer stores it)transient: true— the event flows through the stream once and is discarded; not written toresponseMessage, not in persisted history
When you want transient: any “real-time status indicator” type of event. Examples:
// OK — progress states shouldn't enter chat history
writer.write({
type: "data-progress",
data: { stage: "connecting_mcp" },
transient: true,
});
// OK — run-start events should persist (UI playback needs to see them)
writer.write({
type: "data-run-init",
data: {},
// transient: false (default)
});
// WRONG — forgetting to mark `transient` replays
// "connecting..." / "executing..." / "generating..." on every history refresh
Rule: UI state → transient; business events → persisted. When in doubt, ask: “Should the user, reopening this history next week, see this event?” — yes means persist, no means transient.
execute lifecycle
createUIMessageStream({ execute, ... }) is called
│
├─ Internal: new ReadableStream + controller
│
├─ Synchronously invokes execute({ writer }) — wrapped in an outer try/catch
│ ├─ execute is async fn: returns a Promise immediately
│ │ └─ SDK attaches result.catch(e → enqueue "error" chunk) and pushes onto ongoingStreamPromises
│ └─ execute throws synchronously on invocation (non-async fn, or async fn throws before the
│ Promise is constructed — e.g. param destructuring error)
│ → outer try/catch directly writes an "error" chunk
│
├─ Inside execute, you can:
│ │ writer.write({ type: "data-run-init", data: {} }) ← enqueued immediately
│ │ writer.write({ type: "data-progress", ..., transient: true })
│ │ const agentStream = await runAgentStream(...)
│ │ writer.merge(agentStream.toUIMessageStream({ ... }))
│ │ └─ merge internally pushes (async () => reader loop)().catch(...) onto ongoingStreamPromises
│ └─ execute returns / all awaited Promises resolve
│
├─ handleUIMessageStreamFinish(innerStream) synchronously pipeThroughs two TransformStreams
│ ├─ one injects messageId into the "start" chunk, tracks the "abort" chunk
│ └─ one calls onStepFinish on each "finish-step" chunk; calls onFinish on flush()/cancel()
│
├─ Returns the wrapped ReadableStream — createUIMessageStream returns here, execute still runs in background
│
──── Async phase below ────
│
├─ ongoingStreamPromises all drain (execute + every merged source stream)
├─ waitForStreams.finally() fires → inner controller.close()
├─ Meanwhile every finish-step chunk flowing through the downstream transform → callOnStepFinish()
└─ Downstream TransformStream flush() fires → callOnFinish()
Three counter-intuitive points:
- execute does NOT block stream return —
createUIMessageStreamreturns theReadableStreamsynchronously; execute runs async in the background. The caller can hand the ReadableStream straight to a Response and ship it. handleUIMessageStreamFinishwraps synchronously, it’s not a teardown step — it pipeThroughs the inner stream into the final stream before return.onFinishitself lives in the downstream TransformStream’sflush()/cancel()callback and only fires once the innercontroller.close()triggers flush.- The dividing line for “execute throws leaking” is “awaited or not”, not “sync or async” — so long as execute
is an
asyncfn, any throw (whether at the first line or after an await) goes through the Promise-rejection path and is wrapped into an error chunk by.catch. What actually leaks is unawaited child Promises inside execute (see the next section).
Error capture, in full
The SDK does not have a single unified error-handling table. It’s a layered design — each layer has its own
fallback, and they don’t overlap. Memorize this table and writing execute gets a lot easier to debug:
| Error source | Where caught (source) | Landing form | What user code must do |
|---|---|---|---|
| execute() invocation throws synchronously (non-async fn / async fn throws before Promise construction) | index.js:8342-8384 outer try/catch | Writes error chunk, calls onError(err) for the message | Usually nothing |
| async execute body throws (including post-await throws) | index.js:8370-8377 ongoingStreamPromises.push(result.catch(...)) | Writes error chunk, calls onError(err) | Usually nothing |
| Unawaited child Promise rejection inside execute | None — SDK can’t see it | Node-level unhandledrejection; stream keeps going but never writes again | Must self-try/catch or attach explicit .catch |
| Merged source stream reader throws | index.js:8358-8363 .catch(...) | Writes error chunk, calls onError(err) | Usually nothing; but any external resources the source holds (sandbox / DB) need their own abort |
onStepFinish throws | index.js:5949-5960 try/catch then routed to onError | Does NOT write an error chunk; only invokes onError once | To let the frontend see the error, write the error chunk from inside onStepFinish yourself |
onFinish throws | None — index.js:5927-5943 bare await | Error propagates out of TransformStream.flush() → consumer iterator rejects | Must wrap try/catch/finally yourself — any production onFinish (persistence, lock release, downstream notifications) needs this backstop |
UI transform internals (processUIMessageStream) throw | handleUIMessageStreamFinish forwards onError into processUIMessageStream | Calls onError(err) | Usually nothing |
onError’s return-value contract: it is not a pure logger — its returned string is written into the error
chunk’s errorText field and shipped to the client. A common pattern is to keep a “first error” variable inside
onError and only log / return the first message — the same aborted Promise can fire onError twice (once from
execute, once from a merged source stream); not dedup’ing yields duplicate error logs (concrete implementation in
Case study).
One-liner: execute throws are SDK-handled; onStepFinish throws become a single onError call;
onFinish throws and unawaited child Promises inside execute are the two cases you must handle yourself.
Case study
This section is a production landing case (Zapvol’s task-orchestrator) that threads the execute-driven path, the three writer methods, custom data-* events, transient semantics, and the double-wrapped
handleUIMessageStreamFinishall together. It’s not a mandatory usage pattern — your project could choose a single-layercreateUIMessageStream, or useresult.toUIMessageStreamResponse()directly. This is just one battle-tested way to do it.
Overall shape
// apps/server/src/services/task-orchestrator.ts
return createUIMessageStream<ZapvolMessage>({
originalMessages: uiMessages,
generateId: () => assistantMessageId,
execute: async ({ writer }) => {
// 1. Build a business-layer context and pass writer through (wrapped as a StreamWriter abstraction)
const context = zapvolContextSchema.parse({
taskId, userId, sandbox, writer, browserBridge,
});
// 2. Immediately push run-init (persisted — UI playback needs it)
context.write(DataPartEvent.AGENT_INIT, {});
// 3. A series of transient status events mark agent construction
context.writeTransient(DataPartEvent.AGENT_STATE, {
text: AgentStateKey.AGENT_BUILDING,
});
const { agentConfig, subagentDefs } = await resolveAgentSetup(...);
context.writeTransient(DataPartEvent.AGENT_STATE, {
text: AgentStateKey.MCP_CONNECTING,
});
// ... MCP connection ...
// 4. Finally construct the agent and merge its UI stream in
const agentResult = await runAgentStream(...);
writer.merge(toUIMessageStream(agentResult.agentStream, {
context, modelId, assistantMessageId, createdAt, messages: uiMessages,
}));
// 5. execute returns here — but the SDK waits for the merged agent stream
// to drain before closing the controller and firing onFinish
},
onFinish: async ({ responseMessage, messages, isAborted }) => {
try {
await taskService.saveAssistantMessage(responseMessage);
} catch (error) {
log.error("assistant.persist_failed", { error });
} finally {
// Release lock, clean up MCP connections, etc. — must run even on failure
abortManager.remove(taskId);
await releaseTaskLock(taskId);
}
},
onError: (error) => {
// `session.error` is essentially "a first-error variable in execute's outer scope"
// In your own project this would be `let firstError: string | undefined` captured by closure
const message = error instanceof Error ? error.message : "Unknown error";
if (!session.error) {
log.error("stream.error", { error: message });
session.error = message; // dedup: only keep the first
}
return session.error;
},
});
The stream’s time-ordered output:
data-run-init(persisted)- Three or four
data-agent-state(transient — UI shows “Building…” / “Connecting…” / …) - All the agent’s text-delta / tool-call / tool-result flow in via merge
- Final
data-agent-state: "completed"(transient) - SDK auto-emits
finishchunk - onFinish fires, persists responseMessage
4 general patterns this case demonstrates
- Outer data- init + inner agent merge* — execute pushes run-init / progress first, then merges the agent stream; the client sees a single continuous stream.
- Transient for UI state, non-transient for business events — “connecting…” is transient, “run-init” is persisted; swap them and it’s wrong.
- onFinish must wrap try/catch/finally — the SDK does not catch its throws (see “Error capture, in full”), so persistence / lock release / downstream notifications all need self-handling.
- onError should dedup to first error — the same run may invoke onError multiple times (execute + a merged source stream); record once on first arrival, then return the same string thereafter to avoid duplicate logs / duplicate error chunks.
Easy to miss: two nested handleUIMessageStreamFinish wrappers
This structure appears in any pipeline that does “
createUIMessageStream+ innerwriter.merge(result.toUIMessageStream(...))” — not only this case. Both paths share the samehandleUIMessageStreamFinish(index.js:8100/:8397); nesting them produces two layers.
Any pipeline that does writer.merge(agentStream.toUIMessageStream(...)) inside execute ends up with two layers
of handleUIMessageStreamFinish — an inner one from toUIMessageStream(agentStream) and an outer one from
createUIMessageStream:
agentStream
→ (thin business wrapper) toUIMessageStream(agentStream, opts)
→ SDK agentStream.toUIMessageStream({ messageMetadata, onFinish, onError })
↳ handleUIMessageStreamFinish ← inner: backstop for the agent-result UI conversion
→ writer.merge(...)
→ createUIMessageStream inner controller enqueue
↳ handleUIMessageStreamFinish ← outer: backstop for the overall execute-driven stream
This means onFinish / onStepFinish / onError fire twice per run — once at the inner layer, once at the
outer. A clean division of labor:
| Layer | onFinish | onError | messageMetadata |
|---|---|---|---|
Inner toUIMessageStream(agentStream) | no-op (persistence delegated to the outer layer) | returns error.message for the SDK to emit as an error chunk | used — attaches modelId / firstTokenAt / totalUsage on each part |
Outer createUIMessageStream({ execute }) | persists responseMessage, notifies downstream, releases lock | dedups (only first), logs, returns the message to the SDK | not accepted by this API (see below) |
messageMetadata can only attach to the inner layer: createUIMessageStream({ execute })’s signature
(index.js:8321-8328) does not accept a messageMetadata parameter — only result.toUIMessageStream({ messageMetadata }) does.
To attach part-level metadata to chunks, do it on the inner toUIMessageStream(...) that is writer.merge’d in;
passing it to the outer createUIMessageStream has no effect.
Common traps
1. Unawaited child Promises inside execute go silent — not execute itself
execute’s own returned Promise is tracked by the SDK (see “Error capture, in full”). What actually leaks is a child Promise kicked off inside execute that isn’t awaited:
execute: async ({ writer }) => {
someAsyncFn(); // WRONG — not awaited; rejection becomes a Node unhandledrejection,
// SDK can't see it, writer keeps "working" but downstream is
// waiting for data that will never come
},
Fix: await every async call, or attach an explicit .catch(e => writer.write({ type: "error", errorText: ... })).
2. Transient events do NOT appear in onFinish’s messages
context.writeTransient(...) events never enter responseMessage.metadata.parts. If you want to read them in
onFinish for statistics — you can’t, they were discarded. For persistent statistics, emit non-transient events or
use a side channel (Redis / DB).
3. writer.merge source streams can’t be stopped mid-consumption
Once merged in, the SDK’s reader consumes the source stream until done. Cancelling the outer createUIMessageStream
does not reverse-cancel the merged source stream — it keeps running until it ends on its own. If the source holds
resources (sandbox, DB connection), add your own abort mechanism at the source.
4. data-${string} names can’t contain special characters
The SDK sends the type field to the frontend as a discriminator. Use kebab-case (data-agent-state, not
data-AgentState or data-agent.state) to avoid frontend switch mismatches.
Further reading
Related SDK chapters
- Runtime Lifecycle — the L3 same-named callbacks (
onFinish/onStepFinish/onError) are detailed there - Message Reference Model — the message chain inside the agent (don’t confuse with the L3 outer stream)
SDK source anchors (ai@6.0.134)
dist/index.js:8321-8405—createUIMessageStreamimplementationdist/index.js:5883-5983—handleUIMessageStreamFinishimplementationdist/index.js:7839-8108—toUIMessageStreamimplementation (both paths sharehandleUIMessageStreamFinish)
Zapvol landing reference (Case study)
- Task Orchestration — how the pieces wire together
packages/backend/src/agent/context.ts—StreamWriterinterface definitionapps/server/src/services/task-orchestrator.ts— production implementation of the execute-driven path