# Harness Adapters: Amp, Claude Code, Codex, pi-mono

> How the harness_protocol layer normalizes four different agent CLIs into a single event stream. What each adapter does differently (Amp materializes attachments to files; Claude Code passes Anthropic content blocks directly; Codex/pi-mono extract plain text). The _VALID_STDOUT_EVENT_TYPES allowlist as a forward-compatibility boundary.

- Repository: paradigmxyz/centaur
- GitHub: https://github.com/paradigmxyz/centaur
- Human wiki: https://grok-wiki.com/public/wiki/paradigmxyz-centaur-57fc6b2755e2
- Complete Markdown: https://grok-wiki.com/public/wiki/paradigmxyz-centaur-57fc6b2755e2/llms-full.txt

## Source Files

- `services/api/api/sandbox/harness_protocol.py`
- `services/api/api/sandbox/normalize.py`
- `services/api/api/sandbox/prompt_assembly.py`
- `services/api/tests/test_harness_protocol.py`
- `services/api/tests/test_amp_wrapper.py`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [services/api/api/sandbox/harness_protocol.py](services/api/api/sandbox/harness_protocol.py)
- [services/api/api/sandbox/normalize.py](services/api/api/sandbox/normalize.py)
- [services/api/api/sandbox/prompt_assembly.py](services/api/api/sandbox/prompt_assembly.py)
- [services/api/tests/test_harness_protocol.py](services/api/tests/test_harness_protocol.py)
- [services/api/tests/test_amp_wrapper.py](services/api/tests/test_amp_wrapper.py)
- [services/api/api/agent.py](services/api/api/agent.py)
- [services/sandbox/amp-wrapper.py](services/sandbox/amp-wrapper.py)
- [services/sandbox/claude-app-wrapper.py](services/sandbox/claude-app-wrapper.py)
- [services/sandbox/codex-app-wrapper.py](services/sandbox/codex-app-wrapper.py)
- [services/sandbox/harness_adapter.py](services/sandbox/harness_adapter.py)
</details>

# Harness Adapters: Amp, Claude Code, Codex, pi-mono

Centaur supports four distinct agent CLI backends — **Amp**, **Claude Code**, **Codex**, and **pi-mono** — each with its own wire protocol, event vocabulary, and subprocess lifecycle. A two-layer normalization pipeline (`harness_protocol.py` and `normalize.py`) converts all four into a single canonical NDJSON event stream that the rest of the API can consume without knowing which backend is running.

This page explains what each adapter does in isolation, where they share logic, and where they diverge. Understanding this boundary is essential when adding a new event type, debugging a missing turn-done signal, or extending a backend to pass richer content.

---

## Architecture Overview

```text
┌─────────────────────────────────────────────────────────────┐
│ Sandbox container                                           │
│                                                             │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐   │
│  │ amp-wrapper  │   │claude-app-   │   │codex-app-    │   │
│  │    .py       │   │ wrapper.py   │   │ wrapper.py   │   │
│  └──────┬───────┘   └──────┬───────┘   └──────┬───────┘   │
│         │                  │                   │            │
│         └──────────────────┴───────────────────┘           │
│                        NDJSON stdout                        │
└─────────────────────────────────────────────────────────────┘
                             │
                    (harness stdout pipe)
                             │
┌─────────────────────────────────────────────────────────────┐
│ API process  (services/api/)                                │
│                                                             │
│  agent.py  _stream_stdout()                                 │
│    ├── _VALID_STDOUT_EVENT_TYPES allowlist (warn unknown)   │
│    ├── extract_thread_id()   ← harness_protocol.py          │
│    ├── extract_result()      ← harness_protocol.py          │
│    ├── is_turn_done()        ← harness_protocol.py          │
│    └── normalize_harness_event()  ← normalize.py           │
│               ↓                                             │
│        canonical SSE stream → clients                       │
└─────────────────────────────────────────────────────────────┘
```

Sources: [services/api/api/agent.py:43-88](), [services/api/api/agent.py:870-909]()

---

## The harness_protocol Layer

`harness_protocol.py` contains pure functions — no I/O, no globals, no imports from other API modules — that implement the turn lifecycle protocol understood by every engine.

### is_turn_done

Determines when a main-agent turn has ended. The logic is engine-specific:

| Engine | Terminal event |
|--------|---------------|
| `amp`, `claude-code` | `type == "result"` OR `type == "assistant"` with `stop_reason == "end_turn"` and **no** `parent_tool_use_id` |
| `codex` | `type == "turn.completed"` or `"turn.failed"` |
| `pi-mono` | `type == "agent_end"` |
| any | `type == "error"` (except non-terminal amp-wrapper restart notices) |

The subagent carve-out is critical: when `parent_tool_use_id` is set on an `assistant` event, that event comes from a subagent, not the main agent, and must **not** close the turn. Similarly, an `assistant` event that contains only `tool_use` content blocks is mid-flight tooling, not a completed response.

Amp-wrapper restart notices (`"restarting (1/5)"`) are non-terminal errors. The wrapper emits them when it catches a crash and self-heals; the turn is still in progress. Only `"giving up"` in the message text promotes the error to a turn-ending event.

Sources: [services/api/api/sandbox/harness_protocol.py:24-63](), [services/api/tests/test_harness_protocol.py:12-98]()

### extract_result

Extracts the final answer text from whichever event type carries it:

- **amp/claude-code**: prefers `result` field on a `result` event; falls back to the last `text` content block inside an `assistant` event.
- **codex**: reads `item.text` when `item.type` is `agent_message` or `agentMessage` on `item.completed`.
- **pi-mono**: reads the last content block's `text` field from the `message` inside a `message_end` event.
- All engines: a top-level `turn.done` event (synthesized by the API) carries `result` as a string or `{"text": "..."}` dict.

Sources: [services/api/api/sandbox/harness_protocol.py:66-112]()

### extract_thread_id

Maps the engine-specific session identifier to a common `agent_thread_id`:

- **amp/claude-code**: `session_id` on `system/init` or `assistant` events.
- **codex**: `thread_id` on `thread.started`.
- **pi-mono**: `id` on `session`.

Sources: [services/api/api/sandbox/harness_protocol.py:115-128]()

### build_user_input and messages_to_content_blocks

These two functions build the harness-native user-input envelope sent to the subprocess. `build_user_input` wraps a list of content blocks into `{"type":"user","message":{"role":"user","content":[...]}}`, optionally attaching `steer`, `thread_key`, and `trace_id` fields.

`messages_to_content_blocks` flattens a multi-message conversation history into that block list:

- **attachment_ref** parts become `text` blocks containing a `curl` download instruction referencing the Centaur attachments API. This is how attachments are universally conveyed regardless of backend — the agent is told to download the file by ID.
- **assistant** messages are prefixed with `[Your previous response]:` or `[Previous Centaur response]:` (for history backfills from other sessions), so the model can distinguish its own prior output from the current user request.
- The first `text` part of each user message with a `user_id` is prefixed `<@user_id>:` for multi-user attribution.

Sources: [services/api/api/sandbox/harness_protocol.py:131-212](), [services/api/tests/test_harness_protocol.py:276-444]()

---

## The normalize Layer

`normalize.py` is a 1:1 Python port of `packages/harness-events/src/normalize.ts`. It converts raw NDJSON events into canonical dicts that the API's SSE stream can emit to clients.

### Main dispatcher

```python
# services/api/api/sandbox/normalize.py:850-896
def normalize_harness_event(engine: str, event: dict) -> list[dict]:
    normalized = (engine or "").strip().lower()
    if not normalized:
        # auto-detect from event type fingerprints
        ...
    if normalized == "codex":
        return _normalize_codex_event(event)
    if normalized == "pi-mono":
        return _normalize_pi_event(event)
    return _normalize_amp_like_event(event)   # amp, claude-code, and all personas
```

Persona names (`legal`, `eng`, etc.) fall through to the amp-like normalizer because personas are amp-based engines with custom prompts.

The auto-detection heuristic (when `engine` is empty) fingerprints event types: `item.*` and `turn.*` prefixes → codex; `session`, `agent_end`, `tool_execution_*` → pi-mono; everything else → amp.

Sources: [services/api/api/sandbox/normalize.py:847-896]()

---

## Per-Adapter Details

### Amp

**Wrapper:** `services/sandbox/amp-wrapper.py`

Amp runs as `amp --no-ide --no-notifications --dangerously-allow-all --execute --stream-json --stream-json-input --stream-json-thinking --mode <mode>`. The wrapper exists to handle three concerns the raw CLI cannot:

1. **Handoff chaining**: When `follow=true` appears in a `handoff` tool call, the wrapper detects the `newThreadID` in the subsequent tool result, kills the current process, and immediately spawns `amp threads continue <T-new>`. The intermediate events (between handoff and end-turn) are suppressed to avoid duplicate output.

2. **Crash recovery**: Up to 5 restarts (`MAX_CRASH_RESTARTS`). Each transient crash emits `{"type":"error","error":{"message":"amp exited with code N, restarting (K/5)"}}` (which `harness_protocol.is_turn_done` treats as non-terminal). On the sixth crash it emits `"giving up"` which **is** terminal.

3. **Heartbeats**: Before each run attempt the wrapper emits `{"type":"system","subtype":"wrapper_heartbeat","phase":"<startup|crash_restart|handoff_continue|interrupt_continue>"}`. These are lifecycle signals for observability; `_normalize_amp_like_event` filters `system` events that carry no `subagent_id` and no `init` subtype.

**Attachment handling**: Amp receives content blocks directly in the user envelope. The `attachment_ref` → `curl` instruction conversion happens upstream in `messages_to_content_blocks`, so Amp never sees a binary payload — it sees a plain-text download instruction.

**Successful result suppression**: The wrapper suppresses `result` events that are **not** errors before forwarding them to the API. The comment in the source explains the design: the API synthesizes `turn.done` from stream EOF + accumulated text, so forwarding a `result` event would duplicate the final answer. Error results (subtype `error_during_execution` or `is_error=True`) are forwarded so the API can persist a terminal error state rather than waiting indefinitely for EOF.

Sources: [services/sandbox/amp-wrapper.py:236-319](), [services/sandbox/amp-wrapper.py:333-392](), [services/api/tests/test_amp_wrapper.py:48-101]()

**Normalizer** (`_normalize_amp_like_event`): Amp events mostly pass through unchanged. The important transformations:

- `user` events containing `tool_result` content blocks are re-emitted as canonical `{"type":"tool","content":[...]}` events, extracting `tool_use_id` either from the block itself or from `parent_tool_use_id`.
- `system` events with a `subagent_id` (tasks) are translated into `subagent` lifecycle events (`started`, `working`, `completed`, `failed`).
- `stream_event` wrappers (carrying Anthropic streaming API events like `content_block_start`, `content_block_delta`) are unwrapped and emitted as `assistant`/`reasoning` events.
- Transient restart `error` events (containing `"restarting ("` but not `"giving up"`) are **dropped** from the canonical stream (return `[]`).

Sources: [services/api/api/sandbox/normalize.py:237-437]()

---

### Claude Code

**Wrapper:** `services/sandbox/claude-app-wrapper.py`

Claude Code runs as:

```
claude -p --input-format stream-json --output-format stream-json
       --verbose --include-partial-messages
       --dangerously-skip-permissions --permission-mode bypassPermissions
       [--append-system-prompt-file AGENTS.md]
       [--resume <session_id>]
```

The key distinction: the `--input-format stream-json` / `--output-format stream-json` flags mean Claude Code **natively speaks Anthropic content blocks**. The wrapper pipes Centaur's `{"type":"user","message":{...}}` envelopes **straight through** to the subprocess's stdin — no translation, no text extraction. Claude Code handles images, text blocks, and other content types at the model level.

Unlike the amp wrapper, there is **no crash-restart loop** and **no handoff chaining** — those are Amp-specific behaviors. The Claude Code wrapper is thinner: one subprocess, one thread reading stdout, one thread reading stdin. Interrupt handling works by SIGINT-ing the process group; if a turn was active, the wrapper emits a synthetic error result event so the API can transition state.

**Goal rewriting**: Because `claude -p` ignores slash commands, `/goal X` is intercepted by the wrapper and rewritten to `"Set this thread's working goal: X\n\nAcknowledge briefly..."`. This mirrors Codex's `thread/goal/set` RPC parity.

**OTel integration**: The wrapper reads `LMNR_BASE_URL` + `CENTAUR_TRACE_ID` and configures `OTEL_*` environment variables before spawning `claude`, directing telemetry to the same Laminar backend the codex wrapper uses.

**Normalizer**: Claude Code shares `_normalize_amp_like_event` with Amp. The event shapes are identical (both are Anthropic-protocol CLIs), so no separate normalizer path exists.

Sources: [services/sandbox/claude-app-wrapper.py:1-19](), [services/sandbox/claude-app-wrapper.py:262-283](), [services/sandbox/claude-app-wrapper.py:73-87]()

---

### Codex

**Wrapper:** `services/sandbox/codex-app-wrapper.py`

Codex runs as `codex app-server --listen stdio://` — a long-lived JSON-RPC server. The wrapper speaks a two-channel protocol:

- **Requests**: `{"id": N, "method": "...", "params": {...}}` → synchronous reply via matching `id`.
- **Notifications**: `{"method": "...", "params": {...}}` (no `id`) → asynchronous events forwarded to Centaur.

The wrapper calls `initialize`/`initialized` at startup, then translates each Centaur user input into `turn/start` (or `turn/steer` if a turn is active). Codex outputs events under the `turn.*` and `item.*` namespaces.

**Text extraction**: Codex does not accept Anthropic content blocks. The `text_from_blocks` function in the wrapper converts the incoming block list to a plain string:

```python
# services/sandbox/codex-app-wrapper.py:152-164
def text_from_blocks(blocks: list[dict[str, Any]]) -> str:
    for block in blocks:
        btype = block.get("type")
        if btype == "text":
            parts.append(str(block.get("text") or ""))
        elif btype == "image":
            parts.append("[User sent an image attachment; if needed, ask them to upload it as a file reference.]")
        else:
            parts.append(json.dumps(block, ensure_ascii=False))
    return "\n".join(p for p in parts if p).strip()
```

Images become a plain-text advisory; other block types are JSON-serialized. **This is the key divergence from Claude Code**: Codex cannot consume Anthropic image content blocks natively, so they are downgraded to text.

**Normalizer** (`_normalize_codex_event`): Codex events use dotted namespaces (`item.started`, `item.completed`, `turn.completed`). Most pass through unchanged. Key transformations:

- `thread.started` → `{"type":"system","subtype":"init","session_id":"<thread_id>"}`.
- `turn.completed` → a usage-metadata-only event (no text content; the API synthesizes `turn.done`).
- `turn.failed` → `{"type":"error","error":"..."}`.
- `item.completed` for `agent_message`/`agentMessage` items → **dropped** (to avoid duplicate output; the text was already streamed via `item.agentMessage.delta`).
- Tool calls (`mcp_tool_call`, `tool_call`, `function_call`, etc.) on `item.started` → canonical `assistant/tool_use`; on `item.completed` → canonical `tool/content`.
- `subagent` tool calls (where `tool_name == "subagent"`) are translated into `subagent` lifecycle events.

Sources: [services/sandbox/codex-app-wrapper.py:152-172](), [services/api/api/sandbox/normalize.py:613-665](), [services/api/api/sandbox/normalize.py:485-610]()

---

### pi-mono

There is no standalone pi-mono wrapper script in this repository. The `pi-mono` normalizer path is registered as a recognized engine in `normalize.py` and `harness_protocol.py`, and the event shapes it handles indicate a long-lived session protocol with its own message lifecycle.

**Event shapes**: pi-mono uses `session`, `agent_start`, `agent_end`, `message_start`, `message_update`, `message_end`, `tool_execution_start`, `tool_execution_update`, `tool_execution_end`.

**Text extraction**: Like Codex, pi-mono does not use Anthropic content blocks for input. `_normalize_pi_event` reconstructs tool-use and text events from pi-mono's own schema:

- `tool_execution_start` → `assistant/tool_use` (or `subagent/started` for `toolName == "subagent"`).
- `tool_execution_end` → `tool/result` (or `subagent/completed|failed`).
- `message_end` for assistant messages → the content blocks are normalized through `_normalize_pi_message_content`, which reads `block.type` in (`text`, `thinking`, `tool_call`/`toolcall`) and maps to canonical `assistant`, `reasoning`, or `assistant/tool_use` events.

Sources: [services/api/api/sandbox/normalize.py:698-840](), [services/api/api/sandbox/normalize.py:847-896]()

---

## The _VALID_STDOUT_EVENT_TYPES Allowlist

```python
# services/api/api/agent.py:43-86
_VALID_STDOUT_EVENT_TYPES = frozenset({
    "amp_raw_event", "assistant", "command_execution",
    "content_block_delta", "content_block_start", "content_block_stop",
    "error", "file_change",
    "message_delta", "message_start", "message_stop",
    "item.agentMessage.delta", "item.commandExecution.outputDelta",
    "item.completed", "item.fileChange.outputDelta",
    "item.fileChange.patchUpdated", "item.plan.delta",
    "item.reasoning.summaryPartAdded", "item.reasoning.summaryTextDelta",
    "item.reasoning.textDelta", "item.started", "item.updated",
    "reasoning", "result", "status", "subagent", "system",
    "thread.goal.cleared", "thread.goal.updated", "thread.started",
    "tool", "tool_result", "tool_use",
    "turn.done", "turn.completed", "turn.failed",
    "turn.plan.updated", "turn.started",
    "usage", "user",
})
```

This set is **not** a hard filter — unknown event types are not dropped. Instead, when `_stream_stdout` in `agent.py` encounters an event type absent from the set, it logs a `stdout_unknown_event_type` warning at the `warning` level and continues processing normally.

The design consequence is intentional: the allowlist acts as a **forward-compatibility boundary**. When any of the four backend CLIs ships a new event type, the API logs a warning so the operator knows normalization may be incomplete, but the raw event still flows through the `normalize_harness_event` → SSE path. This prevents silent breakage while surfacing gaps for incremental updates to the normalizer.

To add proper support for a new event type:
1. Add the type to `_VALID_STDOUT_EVENT_TYPES` to silence the warning.
2. Add handling in the appropriate `_normalize_*_event` function in `normalize.py`.
3. Add `is_turn_done`, `extract_result`, or `extract_thread_id` cases in `harness_protocol.py` if the new event carries lifecycle semantics.

Sources: [services/api/api/agent.py:43-86](), [services/api/api/agent.py:875-882]()

---

## Stable Tool Call IDs

When a backend does not supply a stable tool call ID (common in Codex where IDs may be missing or positional), the normalizer generates one deterministically:

```python
# services/api/api/sandbox/normalize.py:59-66
def _stable_tool_call_id(name: str, tool_input: Any, nonce: str = "") -> str:
    payload = {"input": tool_input or {}, "name": name or "tool", "nonce": nonce or ""}
    h = hashlib.sha1(_stable_sorted_json(payload).encode()).hexdigest()[:12]
    return f"tool-call-{h}"
```

The `nonce` is drawn from whatever positional or temporal identifier is available (`index`, `position`, `ordinal`, `event_seq`, `timestamp`, `created_at`). This ensures that `tool_result` events can be correlated to `tool_use` events even when the backend generates no IDs, at the cost of collisions if two calls to the same tool with the same input occur in the same position.

Sources: [services/api/api/sandbox/normalize.py:52-66](), [services/api/api/sandbox/normalize.py:463-482]()

---

## Summary

The `harness_protocol` + `normalize` tandem decouples Centaur's API from the idiosyncratic wire protocols of four different agent CLIs. **Amp** and **Claude Code** share a normalizer path because both are Anthropic-protocol CLIs; Claude Code passes content blocks directly while Amp adds crash recovery, handoff chaining, and result suppression. **Codex** and **pi-mono** each have their own normalizer path and both perform text extraction from content blocks at the wrapper boundary. The `_VALID_STDOUT_EVENT_TYPES` frozenset acts as a versioned changelog boundary: a type appearing in it signals that the normalizer fully handles it, while an unrecognized type generates a warning rather than a crash, keeping the system forward-compatible as CLI backends evolve.

Sources: [services/api/api/sandbox/normalize.py:847-896](), [services/api/api/agent.py:875-882]()
