UI Stream Orchestration

L3 UI-stream — the two paths contrasted / createUIMessageStream's execute-driven mode / three writer methods / custom data-* event protocol / transient semantics / error capture tiering

Why this page exists

The L3 layer (UI stream) in Runtime Lifecycle only covered result.toUIMessageStream() — the pure-transform path that passively consumes an agent’s fullStream and emits SSE events.

But the SDK has another L3 path: createUIMessageStream({ execute }) — the execute-driven path, which actively pushes custom events, merges multiple streams, and writes in phases alongside the agent stream. Typical use cases:

  • Push initialization / config events first, then start the agent stream
  • Interleave progress states (“connecting MCP”, “executing”, “generating”) through an agent run
  • Merge multiple sources (agent stream + background job results + supplementary user input)
  • Staged drain (drain one agent stream, decide whether to start a second based on the result)

Both paths exist in the AI SDK, solve different problems, and have different shapes. This page contrasts them, then grounds them in a production Case study at the end.

The two L3 paths

Dimensionresult.toUIMessageStream()createUIMessageStream({ execute })
PositionA method on the agent-result objectA standalone top-level function
InvocationCalled after you have an agentStreamCalled first, agent is built inside execute
Control granularityPassive transform of the agent’s fullStreamActive scheduling inside execute: write first, merge later
Can push custom events?NoYes: writer.write({ type: "data-xxx", ... })
Can merge multiple streams?No: one input, one outputYes: writer.merge(otherStream) folds streams into one output
CallbacksonFinish / onStepFinish / onErrorSame + an execute async function

When to use which:

  • Only need to forward a single agent stream to the frontend → result.toUIMessageStream()
  • Need to push progress events alongside the agent stream, or merge multiple streams (e.g. config event first, then merge agent, then a summary event) → createUIMessageStream({ execute })

createUIMessageStream full signature

function createUIMessageStream<MESSAGE_METADATA>({
  execute,              // async ({ writer }) => { ... } — the main entry
  originalMessages?,    // for messageId continuation
  generateId?,          // assistant message ID generator
  onStepFinish?,        // L3 step-level callback (see Lifecycle page)
  onFinish?,            // L3 full-stream drain callback
  onError?,             // error serializer
}): ReadableStream<UIMessageStreamPart>

execute is the main entry point — SDK invokes it immediately and asynchronously when you call createUIMessageStream, passing in a writer. You can push whatever you want, merge whatever you want. After your returned Promise resolves, the SDK waits for every writer.merge(...) stream to drain, then closes the controller.

Parameter difference vs toUIMessageStream: createUIMessageStream does not accept messageMetadata — that parameter only exists on result.toUIMessageStream({ messageMetadata }). To attach part-level metadata to chunks, do it on the inner toUIMessageStream that is writer.merge’d in — the classic “outer orchestration + inner metadata” layering (see Case study at the end).

writer API — three methods

writer is a UIMessageStreamWriter (source: dist/index.js:8343-8368):

1. writer.write(part) — enqueue a UI part immediately

writer.write({
  type: "data-agent-state",
  data: { text: "Building agent", type: "progress" },
  transient: true,
});

part must be a UIMessageStreamPart — this includes SDK standard types (text-start, text-delta, tool-call, …) AND custom-extension events with type: "data-${string}". Custom events are the front door for your own UI state.

2. writer.merge(stream) — fold another ReadableStream in

const agentResult = await runAgentStream(params);
writer.merge(agentResult.toUIMessageStream({ ... }));

This is the core assembly pattern of the execute-driven path — the outer stream pushes initialization / config / state data-* events first; once the agent is built and agentStream is in hand, its text-delta / tool-call / tool-result firehose is folded back into the same output via merge. The client sees a single continuous UI stream.

Concurrency semantics: merge internally throws the source stream’s reader loop into an ongoingStreamPromises array; the SDK waits for every ongoing stream before closing the output controller. So after execute returns, the stream does not close immediately — it waits for every merged sub-stream to drain.

3. writer.onError — the user-provided error serializer

Just a reference to the onError you passed to createUIMessageStream, exposed for convenience if you want to invoke it directly inside execute for serialization (rarely used; usually you rely on try/catch or let merge handle error pass-through).

Custom data-* event protocol

The SDK’s UI stream protocol permits arbitrary type: "data-${string}" events — this is the official front door for business-layer extensions to the UI protocol. Only two things are required:

  1. type must start with data- (the protocol-mandated discriminator prefix)
  2. The string in type must be kebab-case serializable (see Trap #4)

Everything else (data shape, the transient flag) is entirely up to the business layer.

Typical event categories

Different projects define different event sets, but they boil down to three categories:

CategoryPurposeTransient?Example shapes
Lifecycle signalsRun start / key state transitionsPersisteddata-run-init, data-task-milestone
Real-time status indicators”What’s happening right now”, only cares about the presenttransientdata-progress, data-agent-state
Structured business payloadsFunctional data updates (lists, progress bars, result snippets)Dependsdata-todos-update, data-search-results

The judgment axis is “should the user, reopening this history next week, see this event?” — yes means persist, no means transient (full rule in the next section).

The transport-agnostic writer abstraction pattern

In production, an agent engine often needs cross-transport reuse — the same tool-call log should be pushable via SSE (HTTP), WebSocket rooms, or Electron IPC. Depending directly on UIMessageStreamWriter in agent code locks it to SSE.

The general solution is to wrap one more interface on top of UIMessageStreamWriter:

interface StreamWriter {
  write(event: {
    type: `data-${string}`;
    data: unknown;
  }): void;

  writeTransient(event: {
    type: `data-${string}`;
    data: unknown;
  }): void;
}

The two methods differ only in whether transient is set to true — usually implemented as one method with a default parameter. Splitting into two methods makes the calling site semantics explicit (context.writeTransient(...) is easier to read and harder to misuse than context.write(..., true)).

Then write one implementation per transport:

  • SSE / HTTP: wraps UIMessageStreamWriter.write(...)
  • WebSocket: wraps wsRoom.broadcast(...)
  • Electron IPC: wraps webContents.send(...)

The agent engine only depends on the StreamWriter interface, calling writer.write(...) / writer.writeTransient(...); the composition root decides which transport is actually wired in. One agent codebase can serve multiple transports.

A concrete production implementation is at the end of this page in Case study.

transient: true — the most-missed flag

When processing custom data-* events, the SDK routes by transient:

  • transient: false (default) — the event is recorded into responseMessage.metadata parts, appears in onFinish’s payload, and is persisted to chat history (if the consumer stores it)
  • transient: true — the event flows through the stream once and is discarded; not written to responseMessage, not in persisted history

When you want transient: any “real-time status indicator” type of event. Examples:

// OK — progress states shouldn't enter chat history
writer.write({
  type: "data-progress",
  data: { stage: "connecting_mcp" },
  transient: true,
});

// OK — run-start events should persist (UI playback needs to see them)
writer.write({
  type: "data-run-init",
  data: {},
  // transient: false (default)
});

// WRONG — forgetting to mark `transient` replays
//         "connecting..." / "executing..." / "generating..." on every history refresh

Rule: UI state → transient; business events → persisted. When in doubt, ask: “Should the user, reopening this history next week, see this event?” — yes means persist, no means transient.

execute lifecycle

createUIMessageStream({ execute, ... }) is called

  ├─ Internal: new ReadableStream + controller

  ├─ Synchronously invokes execute({ writer }) — wrapped in an outer try/catch
  │   ├─ execute is async fn: returns a Promise immediately
  │   │   └─ SDK attaches result.catch(e → enqueue "error" chunk) and pushes onto ongoingStreamPromises
  │   └─ execute throws synchronously on invocation (non-async fn, or async fn throws before the
  │       Promise is constructed — e.g. param destructuring error)
  │       → outer try/catch directly writes an "error" chunk

  ├─ Inside execute, you can:
  │   │  writer.write({ type: "data-run-init", data: {} })     ← enqueued immediately
  │   │  writer.write({ type: "data-progress", ..., transient: true })
  │   │  const agentStream = await runAgentStream(...)
  │   │  writer.merge(agentStream.toUIMessageStream({ ... }))
  │   │    └─ merge internally pushes (async () => reader loop)().catch(...) onto ongoingStreamPromises
  │   └─ execute returns / all awaited Promises resolve

  ├─ handleUIMessageStreamFinish(innerStream) synchronously pipeThroughs two TransformStreams
  │   ├─ one injects messageId into the "start" chunk, tracks the "abort" chunk
  │   └─ one calls onStepFinish on each "finish-step" chunk; calls onFinish on flush()/cancel()

  ├─ Returns the wrapped ReadableStream — createUIMessageStream returns here, execute still runs in background

  ──── Async phase below ────

  ├─ ongoingStreamPromises all drain (execute + every merged source stream)
  ├─ waitForStreams.finally() fires → inner controller.close()
  ├─ Meanwhile every finish-step chunk flowing through the downstream transform → callOnStepFinish()
  └─ Downstream TransformStream flush() fires → callOnFinish()

Three counter-intuitive points:

  1. execute does NOT block stream returncreateUIMessageStream returns the ReadableStream synchronously; execute runs async in the background. The caller can hand the ReadableStream straight to a Response and ship it.
  2. handleUIMessageStreamFinish wraps synchronously, it’s not a teardown step — it pipeThroughs the inner stream into the final stream before return. onFinish itself lives in the downstream TransformStream’s flush()/cancel() callback and only fires once the inner controller.close() triggers flush.
  3. The dividing line for “execute throws leaking” is “awaited or not”, not “sync or async” — so long as execute is an async fn, any throw (whether at the first line or after an await) goes through the Promise-rejection path and is wrapped into an error chunk by .catch. What actually leaks is unawaited child Promises inside execute (see the next section).

Error capture, in full

The SDK does not have a single unified error-handling table. It’s a layered design — each layer has its own fallback, and they don’t overlap. Memorize this table and writing execute gets a lot easier to debug:

Error sourceWhere caught (source)Landing formWhat user code must do
execute() invocation throws synchronously (non-async fn / async fn throws before Promise construction)index.js:8342-8384 outer try/catchWrites error chunk, calls onError(err) for the messageUsually nothing
async execute body throws (including post-await throws)index.js:8370-8377 ongoingStreamPromises.push(result.catch(...))Writes error chunk, calls onError(err)Usually nothing
Unawaited child Promise rejection inside executeNone — SDK can’t see itNode-level unhandledrejection; stream keeps going but never writes againMust self-try/catch or attach explicit .catch
Merged source stream reader throwsindex.js:8358-8363 .catch(...)Writes error chunk, calls onError(err)Usually nothing; but any external resources the source holds (sandbox / DB) need their own abort
onStepFinish throwsindex.js:5949-5960 try/catch then routed to onErrorDoes NOT write an error chunk; only invokes onError onceTo let the frontend see the error, write the error chunk from inside onStepFinish yourself
onFinish throwsNoneindex.js:5927-5943 bare awaitError propagates out of TransformStream.flush() → consumer iterator rejectsMust wrap try/catch/finally yourself — any production onFinish (persistence, lock release, downstream notifications) needs this backstop
UI transform internals (processUIMessageStream) throwhandleUIMessageStreamFinish forwards onError into processUIMessageStreamCalls onError(err)Usually nothing

onError’s return-value contract: it is not a pure logger — its returned string is written into the error chunk’s errorText field and shipped to the client. A common pattern is to keep a “first error” variable inside onError and only log / return the first message — the same aborted Promise can fire onError twice (once from execute, once from a merged source stream); not dedup’ing yields duplicate error logs (concrete implementation in Case study).

One-liner: execute throws are SDK-handled; onStepFinish throws become a single onError call; onFinish throws and unawaited child Promises inside execute are the two cases you must handle yourself.

Case study

This section is a production landing case (Zapvol’s task-orchestrator) that threads the execute-driven path, the three writer methods, custom data-* events, transient semantics, and the double-wrapped handleUIMessageStreamFinish all together. It’s not a mandatory usage pattern — your project could choose a single-layer createUIMessageStream, or use result.toUIMessageStreamResponse() directly. This is just one battle-tested way to do it.

Overall shape

// apps/server/src/services/task-orchestrator.ts
return createUIMessageStream<ZapvolMessage>({
  originalMessages: uiMessages,
  generateId: () => assistantMessageId,

  execute: async ({ writer }) => {
    // 1. Build a business-layer context and pass writer through (wrapped as a StreamWriter abstraction)
    const context = zapvolContextSchema.parse({
      taskId, userId, sandbox, writer, browserBridge,
    });

    // 2. Immediately push run-init (persisted — UI playback needs it)
    context.write(DataPartEvent.AGENT_INIT, {});

    // 3. A series of transient status events mark agent construction
    context.writeTransient(DataPartEvent.AGENT_STATE, {
      text: AgentStateKey.AGENT_BUILDING,
    });
    const { agentConfig, subagentDefs } = await resolveAgentSetup(...);

    context.writeTransient(DataPartEvent.AGENT_STATE, {
      text: AgentStateKey.MCP_CONNECTING,
    });
    // ... MCP connection ...

    // 4. Finally construct the agent and merge its UI stream in
    const agentResult = await runAgentStream(...);
    writer.merge(toUIMessageStream(agentResult.agentStream, {
      context, modelId, assistantMessageId, createdAt, messages: uiMessages,
    }));

    // 5. execute returns here — but the SDK waits for the merged agent stream
    //    to drain before closing the controller and firing onFinish
  },

  onFinish: async ({ responseMessage, messages, isAborted }) => {
    try {
      await taskService.saveAssistantMessage(responseMessage);
    } catch (error) {
      log.error("assistant.persist_failed", { error });
    } finally {
      // Release lock, clean up MCP connections, etc. — must run even on failure
      abortManager.remove(taskId);
      await releaseTaskLock(taskId);
    }
  },

  onError: (error) => {
    // `session.error` is essentially "a first-error variable in execute's outer scope"
    // In your own project this would be `let firstError: string | undefined` captured by closure
    const message = error instanceof Error ? error.message : "Unknown error";
    if (!session.error) {
      log.error("stream.error", { error: message });
      session.error = message;   // dedup: only keep the first
    }
    return session.error;
  },
});

The stream’s time-ordered output:

  1. data-run-init (persisted)
  2. Three or four data-agent-state (transient — UI shows “Building…” / “Connecting…” / …)
  3. All the agent’s text-delta / tool-call / tool-result flow in via merge
  4. Final data-agent-state: "completed" (transient)
  5. SDK auto-emits finish chunk
  6. onFinish fires, persists responseMessage

4 general patterns this case demonstrates

  1. Outer data- init + inner agent merge* — execute pushes run-init / progress first, then merges the agent stream; the client sees a single continuous stream.
  2. Transient for UI state, non-transient for business events — “connecting…” is transient, “run-init” is persisted; swap them and it’s wrong.
  3. onFinish must wrap try/catch/finally — the SDK does not catch its throws (see “Error capture, in full”), so persistence / lock release / downstream notifications all need self-handling.
  4. onError should dedup to first error — the same run may invoke onError multiple times (execute + a merged source stream); record once on first arrival, then return the same string thereafter to avoid duplicate logs / duplicate error chunks.

Easy to miss: two nested handleUIMessageStreamFinish wrappers

This structure appears in any pipeline that does “createUIMessageStream + inner writer.merge(result.toUIMessageStream(...))” — not only this case. Both paths share the same handleUIMessageStreamFinish (index.js:8100 / :8397); nesting them produces two layers.

Any pipeline that does writer.merge(agentStream.toUIMessageStream(...)) inside execute ends up with two layers of handleUIMessageStreamFinish — an inner one from toUIMessageStream(agentStream) and an outer one from createUIMessageStream:

agentStream
  → (thin business wrapper) toUIMessageStream(agentStream, opts)
    → SDK agentStream.toUIMessageStream({ messageMetadata, onFinish, onError })
      ↳ handleUIMessageStreamFinish  ← inner: backstop for the agent-result UI conversion
  → writer.merge(...)
  → createUIMessageStream inner controller enqueue
    ↳ handleUIMessageStreamFinish  ← outer: backstop for the overall execute-driven stream

This means onFinish / onStepFinish / onError fire twice per run — once at the inner layer, once at the outer. A clean division of labor:

LayeronFinishonErrormessageMetadata
Inner toUIMessageStream(agentStream)no-op (persistence delegated to the outer layer)returns error.message for the SDK to emit as an error chunkused — attaches modelId / firstTokenAt / totalUsage on each part
Outer createUIMessageStream({ execute })persists responseMessage, notifies downstream, releases lockdedups (only first), logs, returns the message to the SDKnot accepted by this API (see below)

messageMetadata can only attach to the inner layer: createUIMessageStream({ execute })’s signature (index.js:8321-8328) does not accept a messageMetadata parameter — only result.toUIMessageStream({ messageMetadata }) does. To attach part-level metadata to chunks, do it on the inner toUIMessageStream(...) that is writer.merge’d in; passing it to the outer createUIMessageStream has no effect.

Common traps

1. Unawaited child Promises inside execute go silent — not execute itself

execute’s own returned Promise is tracked by the SDK (see “Error capture, in full”). What actually leaks is a child Promise kicked off inside execute that isn’t awaited:

execute: async ({ writer }) => {
  someAsyncFn();  // WRONG — not awaited; rejection becomes a Node unhandledrejection,
                  //         SDK can't see it, writer keeps "working" but downstream is
                  //         waiting for data that will never come
},

Fix: await every async call, or attach an explicit .catch(e => writer.write({ type: "error", errorText: ... })).

2. Transient events do NOT appear in onFinish’s messages

context.writeTransient(...) events never enter responseMessage.metadata.parts. If you want to read them in onFinish for statistics — you can’t, they were discarded. For persistent statistics, emit non-transient events or use a side channel (Redis / DB).

3. writer.merge source streams can’t be stopped mid-consumption

Once merged in, the SDK’s reader consumes the source stream until done. Cancelling the outer createUIMessageStream does not reverse-cancel the merged source stream — it keeps running until it ends on its own. If the source holds resources (sandbox, DB connection), add your own abort mechanism at the source.

4. data-${string} names can’t contain special characters

The SDK sends the type field to the frontend as a discriminator. Use kebab-case (data-agent-state, not data-AgentState or data-agent.state) to avoid frontend switch mismatches.

Further reading

Related SDK chapters

  • Runtime Lifecycle — the L3 same-named callbacks (onFinish / onStepFinish / onError) are detailed there
  • Message Reference Model — the message chain inside the agent (don’t confuse with the L3 outer stream)

SDK source anchors (ai@6.0.134)

  • dist/index.js:8321-8405createUIMessageStream implementation
  • dist/index.js:5883-5983handleUIMessageStreamFinish implementation
  • dist/index.js:7839-8108toUIMessageStream implementation (both paths share handleUIMessageStreamFinish)

Zapvol landing reference (Case study)

  • Task Orchestration — how the pieces wire together
  • packages/backend/src/agent/context.tsStreamWriter interface definition
  • apps/server/src/services/task-orchestrator.ts — production implementation of the execute-driven path
Was this page helpful?