🌊 Streaming & API Layer
Tokens appear one by one because five async generators pipe data like Unix pipes
When Claude Code responds, you see tokens appear one by one. Behind that is a chain of async generators — like Unix pipes — where each stage yields values progressively. The API sends Server-Sent Events, the SDK parses them into typed objects, and the query loop yields each token to the terminal as it arrives.
- Pipeline:
queryLoop()→queryModelWithStreaming()→ SDK.stream()→ SSE parser → yield tokens - async function* yields values over time instead of returning once
- for await...of consumes generators progressively — natural backpressure
Streaming Pipeline
What you are seeing
The complete streaming pipeline from API response to terminal rendering. Each layer is an async generator that yields to the next, creating a composable chain where tokens flow through progressively.
What to try
Trace how a single token travels from the SSE wire format through each generator until it appears on screen. Notice how tool_use events get buffered while text_delta events pass through immediately.
The Intuition
Before vs After Streaming
Without streaming
User sends message → waits 40 seconds → sees entire response at once. Feels like a loading screen, not a conversation.
With streaming
First token appears in 200ms→ tokens flow at ~50/second → feels like a live conversation.
The async generator pipeline makes this possible — each stage yields tokens as they arrive instead of buffering the whole response.
Async Generators as Unix Pipes
The entire streaming pipeline is a chain of async function* generators. Each one yields values over time instead of returning once. The consumer uses for await...of to pull values progressively — just like how cat file | grep pattern | head processes data line by line without loading everything into memory.
SSE Wire Format
The API sends responses as text/event-stream — the Server-Sent Events format. Each event is a plain-text block with an event type and a JSON data line, separated by double newlines. The SDK handles parsing these into typed objects and assembling partial tokens into complete events. Stream interruption recovery is handled at the application level: the agent harness decides whether to retry the full request or attempt to continue from the last received event. Recovery of partial tool-use blocks requires careful state management in the calling code.
The Query Loop
queryModelWithStreaming() builds the API request (messages, system prompt, tools, beta headers) and yields StreamEvent objects. The outer queryLoop() consumes these events: text_delta events pass through immediately for rendering, while tool_use events are buffered. When the stream ends, if there are pending tool calls, the loop executes them, appends results to messages, and calls the API again.
The SDK's Typed Event Union
The Anthropic TypeScript SDK wraps raw SSE into a typed MessageStreamEvent discriminated union. Every event has a type field — TypeScript narrows the type automatically inside a switch or if block, so the compiler enforces that you only access delta.text on content_block_delta events and tool_use fields on tool_use blocks. This means the streaming pipeline is fully type-safe — a wrong event access is a compile error, not a runtime crash. The SDK also exposes higher-level helpers like stream.on('text', cb) and await stream.finalMessage() that accumulate the full response — useful for non-streaming callers that wrap the streaming API internally (per the SDK streaming helpers docs).
Tool Call Accumulation Pattern
A single tool call arrives as three event types in sequence: content_block_start (opens a new tool_use block with its ID and name), content_block_delta events (stream JSON-encoded input as string chunks), and content_block_stop (signals the tool call is complete). The query loop must buffer all delta chunks and JSON-parse the concatenated input only after the stop event — partial JSON is invalid and will throw. This is why tool calls cannot be executed mid-stream: you must wait for the complete input before calling the tool.
Why use async generators instead of callbacks for streaming?
Key Code Patterns
The Streaming Pipeline (TypeScript pseudocode)
// The streaming pipeline — a chain of async generators
async function* queryModelWithStreaming(
messages: Message[],
systemPrompt: string,
tools: Tool[]
): AsyncGenerator<StreamEvent> {
// Calls the API and yields streaming events
const response = sdk.messages.stream({
model: "claude-opus-4-6",
messages,
system: systemPrompt,
tools,
max_tokens: 8096, // required by the Messages API
});
for await (const event of response) {
yield event; // text_delta, tool_use, message_stop, etc.
}
}
async function* queryLoop(
messages: Message[],
tools: Tool[],
systemPrompt: string
): AsyncGenerator<StreamEvent> {
// The agentic loop — consumes streaming events
while (true) {
const toolBlocks: ToolUseEvent[] = [];
for await (const event of queryModelWithStreaming(messages, systemPrompt, tools)) {
if (event.type === "text_delta") {
yield event; // pass through to REPL for rendering
} else if (event.type === "tool_use") {
toolBlocks.push(event);
}
}
if (toolBlocks.length === 0) return; // no tools = done
const results = await runTools(toolBlocks);
messages.push(...results);
// loop continues — call API again with tool results
}
}
// The REPL consumes the outermost generator:
for await (const event of queryLoop(messages, tools, prompt)) {
renderToTerminal(event); // each token appears immediately
}SSE Wire Format and Parsing
// SSE wire format from the API
//
// event: message_start
// data: {"type":"message_start","message":{"id":"msg_01..."}}
//
// event: content_block_delta
// data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}
//
// event: message_stop
// data: {"type":"message_stop"}
async function* parseSseStream(response: ReadableStream): AsyncGenerator<StreamEvent> {
// Parse Server-Sent Events into typed objects
let buffer = "";
for await (const chunk of response) {
buffer += new TextDecoder().decode(chunk);
while (buffer.includes("\n\n")) {
const idx = buffer.indexOf("\n\n");
const eventStr = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
const event = parseEvent(eventStr);
yield event;
}
}
}Break It — See What Happens
Real-World Numbers
| Metric | Value |
|---|---|
| SSE event types (Messages API) | |
| Generators in pipeline | 5 layers deep |
| SSE keep-alive interval | |
| Time to first token — Claude Sonnet 4 | |
| Stream interruption | Application-level retry logic (not automatic SDK resume) |
Key Takeaways
What to remember for interviews
- 1The pipeline is a chain of async generators — like Unix pipes — where each stage yields values progressively instead of buffering the entire response.
- 2Async generators provide natural backpressure for free: the producer suspends on yield until the consumer calls next(), preventing unbounded memory growth.
- 3Tool calls arrive as three event types in sequence (start → delta → stop); the query loop must buffer all deltas and JSON-parse the concatenated input only after the stop event.
- 4Text delta events pass through immediately for rendering, while tool_use events are buffered until message_stop — then tools execute and the loop retries with results appended.
- 5Stream interruption recovery is application-level, not automatic: the agent harness decides whether to retry the full request or continue from the last received event.
Further Reading
- MDN: Async iteration and generators — Reference for async function*, for await...of, and the async iteration protocol.
- Anthropic Streaming API — Official docs for streaming message responses via Server-Sent Events.
- SSE Specification (WHATWG) — The standard behind text/event-stream — event types, data fields, reconnection.
- Claude Code (source) — Open-source reference for the streaming pipeline architecture described in this module.
- WHATWG Streams API — The browser standard for backpressure-aware streaming — ReadableStream, WritableStream, and the pipe chain that async generators implement natively.
- Anthropic SDK Streaming (TypeScript) — The official SDK's streaming helper API — stream.on('text'), stream.finalMessage(), and the event model wrapping raw SSE.
- Node.js Stream Backpressure Guide — Official Node.js guide on backpressure — the mechanism that prevents unbounded memory growth when the consumer is slower than the producer.
Interview Questions
Showing 4 of 4
Design a streaming pipeline for an AI agent that handles tool calls mid-stream.
★★★Explain backpressure in async generators. Why does it matter for LLM streaming?
★★☆How would you handle network disconnection during a streaming API call?
★★☆How do you handle partial tool-call JSON in a streaming response where the connection drops mid-chunk?
★★★