# Stream turns and events

> Consuming `turn/started`, `item/started`, deltas (`item/agentMessage/delta`, command output, reasoning), `turn/completed`, token usage, and notification opt-out for high-volume streams.

- Repository: openai/codex
- GitHub: https://github.com/openai/codex
- Human docs: https://grok-wiki.com/public/docs/openai-codex-c82680b15ec1
- Complete Markdown: https://grok-wiki.com/public/docs/openai-codex-c82680b15ec1/llms-full.txt

## Source Files

- `README.md`
- `src/request_processors/turn_processor.rs`
- `src/bespoke_event_handling.rs`
- `src/thread_status.rs`
- `src/filters.rs`
- `tests/suite/v2/thread_status.rs`

---

---
title: "Stream turns and events"
description: "Consuming `turn/started`, `item/started`, deltas (`item/agentMessage/delta`, command output, reasoning), `turn/completed`, token usage, and notification opt-out for high-volume streams."
---

After `turn/start` returns an in-progress `turn`, app-server streams JSON-RPC notifications on the transport while Codex runs. Core agent events are translated in `bespoke_event_handling` into v2 `turn/*`, `item/*`, and `thread/tokenUsage/updated` notifications; turn boundaries also drive `thread/status/changed` through `ThreadWatchManager`. Clients that only need final state can opt out of high-volume methods at `initialize` time.

## Prerequisites

| Requirement | Why it matters |
| --- | --- |
| Completed `initialize` / `initialized` handshake | Other RPCs are rejected with `"Not initialized"` |
| Subscribed thread | `thread/start`, `thread/resume`, and `thread/fork` auto-subscribe the connection; use `thread/unsubscribe` to stop turn/item traffic |
| Non-blocking read loop on the transport | Notifications interleave with RPC responses and server requests |

<Note>
`turn/start` returns immediately with `{ turn: { id, status: "inProgress", items: [] } }`. `turn/started` arrives later, when the core agent actually begins the turn—not when the RPC returns.
</Note>

## Turn and item lifecycle

```mermaid
sequenceDiagram
    participant Client
    participant Transport as JSON-RPC transport
    participant TurnProc as turn_processor
    participant Core as CodexThread
    participant Bespoke as bespoke_event_handling
    participant Watch as ThreadWatchManager

    Client->>Transport: turn/start
    Transport->>TurnProc: submit user input
    TurnProc-->>Client: result.turn (inProgress, items [])
    Core-->>Bespoke: EventMsg::TurnStarted
    Bespoke->>Watch: note_turn_started
    Bespoke-->>Client: turn/started
    loop Per ThreadItem
        Bespoke-->>Client: item/started
        Bespoke-->>Client: item/.../delta (optional)
        Bespoke-->>Client: item/completed
    end
    Core-->>Bespoke: EventMsg::TokenCount (optional)
    Bespoke-->>Client: thread/tokenUsage/updated
    Core-->>Bespoke: EventMsg::TurnComplete
    Bespoke->>Watch: note_turn_completed
    Bespoke-->>Client: turn/completed
```

Every item follows the same pattern: **`item/started` → zero or more deltas → `item/completed`**. Build UI state from `item/*` notifications; do not wait for items inside `turn/started` or `turn/completed`.

<Warning>
`turn/started` and `turn/completed` currently carry an **empty `items` array** even when item notifications were streamed. Treat `item/*` as the canonical item list until this is fixed.
</Warning>

## Turn-level notifications

| Method | Params | When emitted |
| --- | --- | --- |
| `turn/started` | `{ threadId, turn }` | Core `TurnStarted`; `turn.status` is `inProgress`, `items` is empty |
| `turn/completed` | `{ threadId, turn }` | Turn finishes: `completed`, `interrupted`, or `failed` |
| `turn/diff/updated` | `{ threadId, turnId, diff }` | After file-change items; aggregated unified diff for the turn |
| `turn/plan/updated` | `{ threadId, turnId, explanation?, plan }` | Agent plan/checklist updates |
| `model/rerouted` | `{ threadId, turnId, fromModel, toModel, reason }` | Backend model reroute |
| `model/verification` | `{ threadId, turnId, verifications }` | Additional verification required |
| `error` | `{ threadId, turnId, error, willRetry }` | Mid-turn failure (`willRetry: false`) or stream retry (`willRetry: true`) |

<ResponseField name="turn.status" type="string">
Terminal values on `turn/completed`: `completed`, `interrupted`, or `failed`. Failures include `turn.error` with `{ message, codexErrorInfo?, additionalDetails? }`.
</ResponseField>

On `turn/started`, app-server aborts pending server requests from the previous turn. On `turn/completed` or interrupt, it aborts turn-scoped server requests and resolves pending `turn/interrupt` responses.

### `turn/completed` without a prior `turn/started`

If `turn/start` is rejected (invalid permissions, environments, or input limits), the RPC may return an error and **no** `turn/started` is emitted. Integration tests assert this for rejected permission and environment selection.

## Item-level notifications

### Shared lifecycle

| Method | Role |
| --- | --- |
| `item/started` | Full `item` snapshot when work begins; `item.id` matches delta `itemId` |
| `item/completed` | Final authoritative `item` after tool/message work finishes |
| `item/autoApprovalReview/started` | [UNSTABLE] Guardian auto-review begins |
| `item/autoApprovalReview/completed` | [UNSTABLE] Guardian auto-review resolves |

`ThreadItem` variants include `userMessage`, `agentMessage`, `plan`, `reasoning`, `commandExecution`, `fileChange`, `mcpToolCall`, `collabToolCall`, `webSearch`, `imageView`, `enteredReviewMode`, `exitedReviewMode`, and `contextCompaction`.

### Streaming deltas

| Method | Concatenation key | Purpose |
| --- | --- | --- |
| `item/agentMessage/delta` | `itemId` | Agent reply text chunks |
| `item/plan/delta` | `itemId` | Plan-mode proposed plan (experimental) |
| `item/reasoning/summaryTextDelta` | `itemId` + `summaryIndex` | Readable reasoning summaries |
| `item/reasoning/summaryPartAdded` | `itemId` | Boundary between summary sections |
| `item/reasoning/textDelta` | `itemId` + `contentIndex` | Raw reasoning blocks |
| `item/commandExecution/outputDelta` | `itemId` | Live stdout/stderr |
| `item/fileChange/patchUpdated` | `itemId` | Structured patch snapshots when streaming apply-patch is enabled |

Core deltas (`AgentMessageContentDelta`, `ReasoningContentDelta`, `ExecCommandOutputDelta`, etc.) are mapped through `item_event_to_server_notification` in `bespoke_event_handling`.

### Example: agent message stream

<RequestExample>

```json
{ "method": "item/started", "params": {
    "threadId": "thr_123",
    "turnId": "turn_456",
    "startedAtMs": 1730910001000,
    "item": { "type": "agentMessage", "id": "msg_1", "text": "" }
} }
```

</RequestExample>

<RequestExample>

```json
{ "method": "item/agentMessage/delta", "params": {
    "threadId": "thr_123",
    "turnId": "turn_456",
    "itemId": "msg_1",
    "delta": "Running "
} }
```

</RequestExample>

<RequestExample>

```json
{ "method": "item/completed", "params": {
    "threadId": "thr_123",
    "turnId": "turn_456",
    "completedAtMs": 1730910005000,
    "item": { "type": "agentMessage", "id": "msg_1", "text": "Running tests now." }
} }
```

</RequestExample>

Concatenate `delta` strings in order for the same `itemId` to reconstruct streaming text before `item/completed` arrives.

## Token usage

Token accounting is **separate** from turn boundaries:

| Method | Params | Source |
| --- | --- | --- |
| `thread/tokenUsage/updated` | `{ threadId, turnId, tokenUsage }` | Core `TokenCount` events during a turn |
| `account/rateLimits/updated` | `{ rateLimits }` | Same event when rate-limit metadata is present |

`tokenUsage` includes rolling `total` and `last` token breakdowns (including `reasoningOutputTokens` and `cachedInputTokens` when reported) plus optional `modelContextWindow`.

On `thread/resume` or `thread/fork`, persisted usage may be replayed as `thread/tokenUsage/updated` immediately after the RPC response so UIs can show historical usage before the next turn. Pass `excludeTurns: true` on resume/fork to skip that replay.

## Thread status alongside turn events

`thread/status/changed` reflects loaded-thread runtime state (`notLoaded`, `idle`, `active`, `systemError`):

- `note_turn_started` → `active` (empty `activeFlags` while running)
- Pending approvals / user-input server requests add `waitingOnApproval` / `waitingOnUserInput` flags
- `note_turn_completed` or `note_turn_interrupted` → `idle` (or `systemError` after a fatal turn error)

`resolve_thread_status` can promote `idle`/`notLoaded` to `active` when a turn is in progress but status events arrived out of order—clients should not assume strict ordering between `turn/started` and the first `thread/status/changed`.

## Notification opt-out

Per-connection suppression is configured at initialize:

<ParamField body="capabilities.optOutNotificationMethods" type="string[]">
Exact method names to drop for this connection. No wildcards or prefixes. Unknown names are accepted and ignored. Does **not** apply to RPC requests, responses, or errors.
</ParamField>

```json
{
  "method": "initialize",
  "id": 1,
  "params": {
    "clientInfo": { "name": "my_client", "title": "My Client", "version": "0.1.0" },
    "capabilities": {
      "optOutNotificationMethods": [
        "thread/started",
        "item/agentMessage/delta",
        "thread/status/changed"
      ]
    }
  }
}
```

| Use case | Methods to opt out |
| --- | --- |
| Headless integrations that poll `thread/read` | `thread/started`, `item/agentMessage/delta` |
| Status from RPC only | `thread/status/changed` |
| High-frequency audio (realtime) | `thread/realtime/outputAudio/delta` |

Filtered notifications are dropped in the transport layer before write—opted-out clients never see them on the wire. Integration tests in `initialize.rs` and `thread_status.rs` verify `thread/started` and `thread/status/changed` filtering.

<Info>
Opt-out applies to typed app-server notifications (`thread/*`, `turn/*`, `item/*`, `rawResponseItem/*`, etc.). Server-initiated **requests** (approvals, MCP elicitation) are unaffected.
</Info>

## Client integration pattern

<Steps>
<Step title="Subscribe to a thread">
Call `thread/start`, `thread/resume`, or `thread/fork`. The connection is auto-subscribed to that thread's event fan-out.
</Step>
<Step title="Start a turn">
Send `turn/start`; record `result.turn.id`. Begin rendering when `turn/started` confirms the same id.
</Step>
<Step title="Consume the notification stream">
Dispatch on `method`. For each `itemId`, maintain partial state from deltas; commit on `item/completed`.
</Step>
<Step title="Finish the turn">
On `turn/completed`, read `turn.status` and `turn.error`. Clear turn-scoped UI; keep accumulated items from notifications.
</Step>
<Step title="Handle parallel traffic">
Process server requests (approvals, `tool/requestUserInput`) in the same read loop—they share the turn timeline with `item/*` events.
</Step>
</Steps>

### Ordering and backpressure

- Notifications share the transport with RPC traffic; use a single reader task per connection.
- When ingress queues saturate, new **requests** receive JSON-RPC `-32001` (`"Server overloaded; retry later."`). Notifications already accepted are not individually backpressured—design UIs to tolerate bursts of deltas or opt out of noisy methods.

### Related turn entry points

| RPC | Streaming behavior |
| --- | --- |
| `review/start` | Same `item/*` + `turn/completed` pattern with review-mode items |
| `thread/compact/start` | Progress via standard turn/item notifications |
| `thread/shellCommand` | Reuses active turn items, or starts `turn/started` → items → `turn/completed` when idle |
| `turn/steer` | Injects input into an in-flight turn; no new `turn/started` |
| `turn/interrupt` | Turn ends with `turn/completed` and `status: "interrupted"` |

## Implementation map

```text
turn/start (turn_processor)
  └─ submit_user_input → TurnStartResponse { turn inProgress }

Core event listener (thread_lifecycle)
  └─ apply_bespoke_event_handling
       ├─ TurnStarted  → turn/started + ThreadWatchManager::note_turn_started
       ├─ Item* / deltas → item_event_to_server_notification
       ├─ TokenCount   → thread/tokenUsage/updated
       ├─ TurnComplete → turn/completed + note_turn_completed
       └─ TurnAborted  → turn/completed (interrupted)

Outgoing path (transport)
  └─ per-connection optOutNotificationMethods filter
```

## Verification

Run the app-server integration suite for turn streaming:

```bash
cd codex-rs && just test -p codex-app-server turn_start
```

Useful focused tests: `tests/suite/v2/turn_start.rs` (started/completed ordering), `tests/suite/v2/thread_status.rs` (status + opt-out), `tests/suite/v2/initialize.rs` (notification filtering).

## Related pages

<CardGroup>
<Card title="Quickstart" href="/quickstart">
End-to-end first connection through `turn/start` and reading `item/*` / `turn/completed`.
</Card>
<Card title="Threads, turns, and items" href="/threads-turns-items">
Core primitives, subscription model, and persisted rollout relationship.
</Card>
<Card title="Connection lifecycle" href="/connection-lifecycle">
Initialize handshake, notification opt-out, and thread subscribe/unsubscribe.
</Card>
<Card title="Notifications and events" href="/notifications-and-events">
Full notification catalog and `ThreadItem` union reference.
</Card>
<Card title="Approvals and server requests" href="/approvals-and-server-requests">
Inline approval flows interleaved with item streaming.
</Card>
</CardGroup>
