# Streaming All the Way Down: What Happens Between Token and Tool Call?

> Every LLM call returns an AsyncIterable of AssistantMessageEvents. This page follows an event from the provider stream through AssistantMessageEventStream (utils/event-stream.ts), into the agent loop's stream function, and up to the TUI renderer. It asks: where is backpressure applied, how are partial tool-call arguments accumulated before validation, and what does the overflow utility guard against?

- Repository: earendil-works/pi
- GitHub: https://github.com/earendil-works/pi
- Human wiki: https://grok-wiki.com/public/wiki/earendil-works-pi-8b87608fc234
- Complete Markdown: https://grok-wiki.com/public/wiki/earendil-works-pi-8b87608fc234/llms-full.txt

## Source Files

- `packages/ai/src/stream.ts`
- `packages/ai/src/utils/event-stream.ts`
- `packages/ai/src/utils/overflow.ts`
- `packages/ai/src/utils/json-parse.ts`
- `packages/ai/src/session-resources.ts`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [packages/ai/src/stream.ts](packages/ai/src/stream.ts)
- [packages/ai/src/utils/event-stream.ts](packages/ai/src/utils/event-stream.ts)
- [packages/ai/src/utils/overflow.ts](packages/ai/src/utils/overflow.ts)
- [packages/ai/src/utils/json-parse.ts](packages/ai/src/utils/json-parse.ts)
- [packages/ai/src/session-resources.ts](packages/ai/src/session-resources.ts)
- [packages/ai/src/types.ts](packages/ai/src/types.ts)
- [packages/ai/src/providers/anthropic.ts](packages/ai/src/providers/anthropic.ts)
- [packages/agent/src/agent-loop.ts](packages/agent/src/agent-loop.ts)
- [packages/ai/src/utils/validation.ts](packages/ai/src/utils/validation.ts)
- [packages/coding-agent/src/core/agent-session.ts](packages/coding-agent/src/core/agent-session.ts)
- [packages/coding-agent/src/modes/interactive/interactive-mode.ts](packages/coding-agent/src/modes/interactive/interactive-mode.ts)
</details>

# Streaming All the Way Down: What Happens Between Token and Tool Call?

Every LLM response in this codebase travels through four distinct layers before it has any effect: the provider HTTP stream, the `AssistantMessageEventStream` event bus, the agent loop's `streamAssistantResponse` function, and the TUI or extension subscriber. Each layer has a specific contract. This page traces a single event from raw network bytes to validated tool execution, asking along the way where backpressure lives, how partial JSON is assembled, and what the `overflow` utility actually guards against.

---

## What is the simplest version of this system?

The simplest design would be: call the LLM, get a string back, parse it, act on it. The complexity that replaces that design comes from two real requirements:

1. **Streaming renders text progressively** as the model generates it, which requires an event-at-a-time protocol rather than a single resolved value.
2. **Tool call arguments arrive as a JSON fragment stream**, so argument objects must be assembled, parsed, and validated incrementally before tool execution is safe.

The architecture resolves both by defining a typed event union (`AssistantMessageEvent`) and a generic queue (`EventStream<T, R>`) that lets producers push events at their own pace while consumers pull via `for await`.

---

## Layer 1: The Public Entry Point (`packages/ai/src/stream.ts`)

```ts
// packages/ai/src/stream.ts:25-32
export function stream<TApi extends Api>(
    model: Model<TApi>,
    context: Context,
    options?: ProviderStreamOptions,
): AssistantMessageEventStream {
    const provider = resolveApiProvider(model.api);
    return provider.stream(model, context, options as StreamOptions);
}
```

`stream()` resolves the correct provider from the API registry and immediately returns an `AssistantMessageEventStream`. The caller never sees a raw HTTP response — the provider adapter's job is to translate whatever wire format it receives into typed events and push them into the stream object before returning it.

This means: **the stream object is returned before any events exist in it**. The provider populates it asynchronously. Callers that iterate `for await (const event of stream)` will suspend if the queue is empty, which is the mechanism that provides implicit backpressure.

Sources: [packages/ai/src/stream.ts:25-32]()

---

## Layer 2: The Event Bus (`packages/ai/src/utils/event-stream.ts`)

### What does `EventStream<T, R>` actually implement?

```ts
// packages/ai/src/utils/event-stream.ts:4-67
export class EventStream<T, R = T> implements AsyncIterable<T> {
    private queue: T[] = [];
    private waiting: ((value: IteratorResult<T>) => void)[] = [];
    private done = false;
    // ...
    push(event: T): void { ... }
    end(result?: R): void { ... }
    async *[Symbol.asyncIterator](): AsyncIterator<T> { ... }
    result(): Promise<R> { ... }
}
```

The class maintains two parallel data structures: `queue` (events that arrived before anyone asked) and `waiting` (consumers that asked before events arrived). On each `push()`:

- If a consumer is suspended in `for await`, it is resumed immediately via `waiter({ value: event, done: false })`.
- If no consumer is waiting, the event is buffered in `queue`.

This is a **single-consumer, single-producer rendez-vous queue**. The "backpressure" in this system is purely implicit: the provider calls `push()` synchronously inside its event handler loop, and the agent loop's `await` on the next event creates a natural pause between them. There is no explicit flow control, buffer limit, or back-channel signal to slow the provider. If the provider pushes faster than the consumer processes, events accumulate in `queue`.

### How does `AssistantMessageEventStream` specialize it?

```ts
// packages/ai/src/utils/event-stream.ts:69-83
export class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
    constructor() {
        super(
            (event) => event.type === "done" || event.type === "error",
            (event) => {
                if (event.type === "done") return event.message;
                else if (event.type === "error") return event.error;
                throw new Error("Unexpected event type for final result");
            },
        );
    }
}
```

The two callbacks teach `EventStream` which event type terminates the stream and how to extract the final `AssistantMessage` from that terminal event. Callers can either iterate all events or skip directly to `stream.result()`, which returns a promise that resolves when the `done` or `error` event fires.

Sources: [packages/ai/src/utils/event-stream.ts:4-88]()

---

## Layer 3: Provider Adapters — How Raw Tokens Become Events

### The event type taxonomy

```ts
// packages/ai/src/types.ts:347-359
export type AssistantMessageEvent =
    | { type: "start"; partial: AssistantMessage }
    | { type: "text_start"; contentIndex: number; partial: AssistantMessage }
    | { type: "text_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
    | { type: "text_end"; contentIndex: number; content: string; partial: AssistantMessage }
    | { type: "thinking_start"; contentIndex: number; partial: AssistantMessage }
    | { type: "thinking_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
    | { type: "thinking_end"; contentIndex: number; content: string; partial: AssistantMessage }
    | { type: "toolcall_start"; contentIndex: number; partial: AssistantMessage }
    | { type: "toolcall_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
    | { type: "toolcall_end"; contentIndex: number; toolCall: ToolCall; partial: AssistantMessage }
    | { type: "done"; reason: ...; message: AssistantMessage }
    | { type: "error"; reason: ...; error: AssistantMessage };
```

Every event carries `partial: AssistantMessage`, a live snapshot of the in-progress message object at the moment the event was pushed. This means the consumer always has a complete (if incomplete) picture of the message state without needing to track deltas themselves.

### How partial tool-call arguments are accumulated (Anthropic provider example)

The Anthropic streaming protocol sends tool arguments as successive `input_json_delta` chunks. The provider maintains a mutable block object per content index:

```ts
// packages/ai/src/providers/anthropic.ts:604-616
} else if (event.delta.type === "input_json_delta") {
    const index = blocks.findIndex((b) => b.index === event.index);
    const block = blocks[index];
    if (block && block.type === "toolCall") {
        block.partialJson += event.delta.partial_json;
        block.arguments = parseStreamingJson(block.partialJson);
        stream.push({
            type: "toolcall_delta",
            contentIndex: index,
            delta: event.delta.partial_json,
            partial: output,
        });
    }
}
```

Two things happen simultaneously on each delta:
1. `partialJson` (a raw string scratch buffer) is appended with the new chunk.
2. `arguments` is re-parsed from `partialJson` via `parseStreamingJson`, giving a best-effort object representation of whatever JSON has arrived so far.

At `content_block_stop`, the final parse is committed and `partialJson` is deleted from the block before the `toolcall_end` event fires:

```ts
// packages/ai/src/providers/anthropic.ts:644-654
} else if (block.type === "toolCall") {
    block.arguments = parseStreamingJson(block.partialJson);
    delete (block as { partialJson?: string }).partialJson;
    stream.push({
        type: "toolcall_end",
        contentIndex: index,
        toolCall: block,
        partial: output,
    });
}
```

The `partialJson` scratch buffer is an internal implementation detail; downstream consumers only see the parsed `arguments` object. Other providers (OpenAI completions, Bedrock, Mistral, OpenAI Responses) follow the same pattern with the same field names.

Sources: [packages/ai/src/providers/anthropic.ts:564-655]()

---

## Layer 4: Partial JSON Parsing (`packages/ai/src/utils/json-parse.ts`)

The question "what does `parseStreamingJson` return when the JSON is incomplete?" has a definite answer:

```ts
// packages/ai/src/utils/json-parse.ts:104-124
export function parseStreamingJson<T = Record<string, unknown>>(partialJson: string | undefined): T {
    if (!partialJson || partialJson.trim() === "") {
        return {} as T;
    }
    try {
        return parseJsonWithRepair<T>(partialJson);
    } catch {
        try {
            const result = partialParse(partialJson);      // third-party partial-json library
            return (result ?? {}) as T;
        } catch {
            try {
                const result = partialParse(repairJson(partialJson));
                return (result ?? {}) as T;
            } catch {
                return {} as T;                            // always returns an object, never throws
            }
        }
    }
}
```

The function applies a three-level parse cascade:
1. Full JSON parse with repair (handles stray control characters and invalid backslash escapes).
2. `partial-json` library parse, which tolerates truncated JSON structures.
3. `partial-json` parse on the repaired string.

If all three fail, it returns `{}`. **This function never throws**. The goal is a best-effort live object for display during streaming; final correctness comes from the last parse at `toolcall_end`, which receives the complete JSON string.

The `repairJson` function specifically handles two malformation classes that appear in real LLM output: raw control characters inside strings (e.g., literal `\n` that should be `\\n`) and invalid escape sequences (e.g., `\p` which is not a valid JSON escape).

Sources: [packages/ai/src/utils/json-parse.ts:1-124]()

---

## Layer 5: The Agent Loop (`packages/agent/src/agent-loop.ts`)

### From stream to validated tool call

`streamAssistantResponse` is the function that drives the provider stream, re-emits events as `AgentEvent`s, and returns the finalized `AssistantMessage`:

```ts
// packages/agent/src/agent-loop.ts:313-357
for await (const event of response) {
    switch (event.type) {
        case "start":
            partialMessage = event.partial;
            context.messages.push(partialMessage);
            addedPartial = true;
            await emit({ type: "message_start", message: { ...partialMessage } });
            break;

        case "text_start":
        case "text_delta":
        // ... (all mid-stream events)
        case "toolcall_delta":
        case "toolcall_end":
            if (partialMessage) {
                partialMessage = event.partial;
                context.messages[context.messages.length - 1] = partialMessage;
                await emit({
                    type: "message_update",
                    assistantMessageEvent: event,
                    message: { ...partialMessage },
                });
            }
            break;

        case "done":
        case "error": {
            const finalMessage = await response.result();
            // ...
            await emit({ type: "message_end", message: finalMessage });
            return finalMessage;
        }
    }
}
```

Three behaviors are worth noting:

1. **Eager partial message insertion**: on the `start` event, the partial message is pushed into `context.messages` immediately. Subsequent `message_update` events overwrite it in place (`context.messages[context.messages.length - 1] = partialMessage`). This is how context-aware compaction can observe the partial assistant turn.

2. **Event passthrough**: every `AssistantMessageEvent` becomes a `message_update` `AgentEvent`, carrying both the inner event and the current partial message snapshot. The TUI and extension subscribers receive both.

3. **Argument validation happens after streaming ends**: once `streamAssistantResponse` returns, `runLoop` calls `executeToolCalls`, which calls `prepareToolCall`, which calls `validateToolArguments`. The streaming phase never validates — only the `toolcall_end` event's finalized `arguments` object is validated.

### When is `validateToolArguments` called?

```ts
// packages/agent/src/agent-loop.ts:578-581
const preparedToolCall = prepareToolCallArguments(tool, toolCall);
const validatedArgs = validateToolArguments(tool, preparedToolCall);
```

`validateToolArguments` uses TypeBox's `Value.Convert` for coercion plus an AJV-style validator. If validation fails, `prepareToolCall` returns an `ImmediateToolCallOutcome` with an error string rather than a `PreparedToolCall` — this feeds an error result back to the LLM without executing anything.

Sources: [packages/agent/src/agent-loop.ts:275-368](), [packages/agent/src/agent-loop.ts:562-626]()

---

## Layer 6: The Overflow Guard (`packages/ai/src/utils/overflow.ts`)

`isContextOverflow` is not part of the streaming path itself — it is called after a stream completes to classify the returned `AssistantMessage`. It answers: "did this request fail because the context window was exceeded?"

The function handles three distinct cases:

| Case | Signal | Providers |
|------|--------|-----------|
| Error-based overflow | `stopReason === "error"` and `errorMessage` matches an `OVERFLOW_PATTERN` regex | Anthropic, OpenAI, Gemini, Groq, xAI, Mistral, Bedrock, llama.cpp, LM Studio, etc. |
| Silent overflow | `stopReason === "stop"` but `usage.input + usage.cacheRead > contextWindow` | z.ai |
| Truncation overflow | `stopReason === "length"` with `output === 0` and input fills `>= 99%` of `contextWindow` | Xiaomi MiMo |

```ts
// packages/ai/src/utils/overflow.ts:122-150
export function isContextOverflow(message: AssistantMessage, contextWindow?: number): boolean {
    if (message.stopReason === "error" && message.errorMessage) {
        const isNonOverflow = NON_OVERFLOW_PATTERNS.some((p) => p.test(message.errorMessage!));
        if (!isNonOverflow && OVERFLOW_PATTERNS.some((p) => p.test(message.errorMessage!))) {
            return true;
        }
    }
    if (contextWindow && message.stopReason === "stop") {
        const inputTokens = message.usage.input + message.usage.cacheRead;
        if (inputTokens > contextWindow) return true;
    }
    if (contextWindow && message.stopReason === "length" && message.usage.output === 0) {
        const inputTokens = message.usage.input + message.usage.cacheRead;
        if (inputTokens >= contextWindow * 0.99) return true;
    }
    return false;
}
```

The `NON_OVERFLOW_PATTERNS` exclusion list exists because some error messages match overflow patterns structurally but mean something different — for example, AWS Bedrock throttling errors contain the phrase "too many tokens" (a generic `OVERFLOW_PATTERNS` match) but are not context window errors. The exclusion list is checked first.

Sources: [packages/ai/src/utils/overflow.ts:33-151]()

---

## End-to-End Event Sequence

```text
Provider HTTP stream
    │
    │  content_block_delta (input_json_delta)
    ▼
Provider adapter
    │  block.partialJson += delta
    │  block.arguments = parseStreamingJson(partialJson)  ← best-effort live object
    │  stream.push({ type: "toolcall_delta", ... })
    ▼
AssistantMessageEventStream.push()
    │  if consumer waiting → resume immediately
    │  else → queue.push(event)
    ▼
agent-loop: for await (const event of response)
    │  partialMessage = event.partial
    │  context.messages[last] = partialMessage           ← live context update
    │  emit({ type: "message_update", ... })
    ▼
AgentSession._emit() / extension runner
    │  subscribers (TUI, RPC clients) receive message_update
    ▼
--- stream ends: toolcall_end fires ---
    │  block.arguments = parseStreamingJson(partialJson) ← final parse
    │  delete block.partialJson
    │  stream.push({ type: "toolcall_end", toolCall: block, ... })
    ▼
streamAssistantResponse returns AssistantMessage
    ▼
executeToolCalls → prepareToolCall
    │  validateToolArguments(tool, toolCall)              ← TypeBox validation
    │  if invalid → ImmediateToolCallOutcome (error to LLM)
    │  if valid   → PreparedToolCall → executePreparedToolCall
    ▼
isContextOverflow(finalMessage, contextWindow?)           ← post-hoc classification
```

---

## Where is backpressure applied?

The honest answer: **it isn't, in the traditional sense**. The `EventStream` queue is unbounded and there is no signal flowing back to the provider to slow down. Backpressure is instead provided by the `await emit(...)` calls in `streamAssistantResponse`: because `emit` is awaited, the agent loop cannot dequeue the next event until the current one has been fully processed by all listeners. If a listener is slow (e.g., a TUI render takes time), the agent loop stalls on `await emit`, which stalls the `for await` consumer, which leaves events buffered in `queue`. This is cooperative backpressure through `async/await` scheduling, not explicit flow control.

---

## Summary

The streaming pipeline is a layered push-pull system: providers push typed events into a rendez-vous queue, the agent loop pulls events via `async iteration` and re-emits them as `AgentEvent`s, and tool arguments are assembled from raw JSON fragments using a fault-tolerant parser cascade (`parseStreamingJson`) that never throws. Validation happens once, after the stream closes, using the fully-assembled argument object from `toolcall_end`. The `isContextOverflow` utility sits outside the stream path and classifies a completed `AssistantMessage` post-hoc, covering provider-specific failure modes that range from explicit error messages to silent truncation detected only via token-usage ratios.

Sources: [packages/ai/src/utils/event-stream.ts:50-66](), [packages/agent/src/agent-loop.ts:310-368]()
