# Protocol, Codec & Client Transport

> How the three wire protocols (Connect, gRPC, gRPC-Web) are detected and demultiplexed; envelope framing (5-byte header); codec format selection (proto vs. JSON); compression (gzip, zstd); and the client-side Tower HTTP transport in connectrpc/src/client/. Covers where to look when adding a new codec or compression algorithm.

- Repository: anthropics/connect-rust
- GitHub: https://github.com/anthropics/connect-rust
- Human wiki: https://grok-wiki.com/public/wiki/anthropics-connect-rust-abe117693c52
- Complete Markdown: https://grok-wiki.com/public/wiki/anthropics-connect-rust-abe117693c52/llms-full.txt

## Source Files

- `connectrpc/src/protocol.rs`
- `connectrpc/src/envelope.rs`
- `connectrpc/src/codec.rs`
- `connectrpc/src/compression.rs`
- `connectrpc/src/client/mod.rs`
- `connectrpc/src/client/http2.rs`
- `connectrpc/src/grpc_status.rs`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [connectrpc/src/protocol.rs](connectrpc/src/protocol.rs)
- [connectrpc/src/envelope.rs](connectrpc/src/envelope.rs)
- [connectrpc/src/codec.rs](connectrpc/src/codec.rs)
- [connectrpc/src/compression.rs](connectrpc/src/compression.rs)
- [connectrpc/src/client/mod.rs](connectrpc/src/client/mod.rs)
- [connectrpc/src/client/http2.rs](connectrpc/src/client/http2.rs)
- [connectrpc/src/grpc_status.rs](connectrpc/src/grpc_status.rs)
</details>

# Protocol, Codec & Client Transport

This page explains the three wire layers that every ConnectRPC message passes through: protocol detection (which of the three wire protocols is in use), envelope framing (how streaming messages are delimited), and codec/compression (how a protobuf message becomes bytes on the wire). It then covers the two client-side Tower HTTP transports and explains where to extend the system with a new codec or compression algorithm.

Understanding these layers is essential before adding a new protocol feature, debugging a conformance failure, or wiring up a custom compressor. The key types live in five source files: `protocol.rs`, `envelope.rs`, `codec.rs`, `compression.rs`, and `client/mod.rs`.

---

## Wire Protocols

The crate supports three protocols that share RPC semantics (services, methods, error codes) but differ on the wire in framing, header conventions, trailer delivery, and error representation.

| Protocol | Content-Type prefix | HTTP version | Trailers | Error delivery |
|---|---|---|---|---|
| Connect unary | `application/proto`, `application/json` | HTTP/1.1+ | None (HTTP status) | HTTP status + JSON body |
| Connect streaming | `application/connect+{proto,json}` | HTTP/1.1+ | Encoded in final body frame | Envelope end-stream |
| gRPC | `application/grpc{+proto,+json}` | HTTP/2 only | HTTP/2 HEADERS frame | `grpc-status` trailer |
| gRPC-Web | `application/grpc-web{+proto,+json}` | HTTP/1.1+ | 0x80-flagged body frame | Inline trailer frame |
| gRPC-Web text | `application/grpc-web-text{+proto}` | HTTP/1.1+ | Same as gRPC-Web, base64 | Inline trailer frame |

Sources: [connectrpc/src/protocol.rs:44-61]()

### Protocol Detection

Incoming requests are identified solely by the `Content-Type` header. `Protocol::detect` reads the header and delegates to `detect_from_content_type`:

```rust
// connectrpc/src/protocol.rs:84-87
pub fn detect(headers: &HeaderMap) -> Option<RequestProtocol> {
    let content_type = headers.get(CONTENT_TYPE)?.to_str().ok()?;
    Self::detect_from_content_type(content_type)
}
```

The detection order matters because `"application/grpc"` is a prefix of `"application/grpc-web"`. The implementation checks the most specific prefix first:

1. `application/grpc-web-text` → gRPC-Web text mode (proto only; `+json` is rejected)
2. `application/grpc-web` → gRPC-Web
3. `application/grpc` → gRPC
4. `application/connect+` → Connect streaming
5. `application/proto` / `application/json` → Connect unary

Sources: [connectrpc/src/protocol.rs:92-168]()

The result is a `RequestProtocol` struct carrying `protocol`, `codec_format`, `is_streaming`, and `is_text_mode`. All four fields are set in one pass, so callers never re-parse the content type.

Sources: [connectrpc/src/protocol.rs:64-78]()

### Protocol-Specific Header Names

Each protocol uses different header names for the same concept. The `Protocol` enum exposes accessor methods so upper layers never hardcode strings:

```rust
// connectrpc/src/protocol.rs:203-210
pub fn timeout_header(&self) -> &'static str {
    match self {
        Protocol::Connect => "connect-timeout-ms",
        Protocol::Grpc | Protocol::GrpcWeb => "grpc-timeout",
    }
}
```

Pre-parsed `HeaderName` statics in the `hdr` module avoid re-parsing per-request; profiling showed the naive `header(&str, _)` approach consumed ~0.7% CPU on the echo hot path.

Sources: [connectrpc/src/protocol.rs:26-39]()

### Key Protocol Properties

| Property | Connect | gRPC | gRPC-Web |
|---|---|---|---|
| Requires HTTP/2 | No | Yes | No |
| Real HTTP status codes | Yes (unary) | No (always 200) | No |
| HTTP/2 HEADERS trailers | No | Yes | No |

Sources: [connectrpc/src/protocol.rs:234-256]()

---

## Envelope Framing

All streaming RPCs (Connect streaming, gRPC, gRPC-Web) use the same 5-byte envelope header. Connect unary RPCs do **not** use envelope framing — the body is the raw message.

```text
Byte 0:    flags  (0x00 = data, 0x01 = compressed, 0x02 = end-stream)
Bytes 1-4: payload length (big-endian uint32)
Bytes 5…:  payload
```

Sources: [connectrpc/src/envelope.rs:1-29]()

### Encoding and Decoding

`Envelope::encode` writes `[flag, len_u32_be, payload]` into a `BytesMut`. `Envelope::decode_with_limit` reads a header, checks the declared length against `max_size` before waiting for data (defense against malicious headers claiming huge lengths), then splits the payload.

```rust
// connectrpc/src/envelope.rs:109-135
pub fn decode_with_limit(buf: &mut BytesMut, max_size: usize) -> Result<Option<Self>, ConnectError> {
    if buf.len() < HEADER_SIZE { return Ok(None); }
    let flags = buf[0];
    let length = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
    if length > max_size {
        return Err(ConnectError::resource_exhausted(...));
    }
    if buf.len() < HEADER_SIZE + length { return Ok(None); } // need more data
    buf.advance(HEADER_SIZE);
    let data = buf.split_to(length).freeze();
    Ok(Some(Self { flags, data }))
}
```

### Streaming Codec Integration

`EnvelopeDecoder` implements `tokio_util::codec::Decoder` and plugs into `FramedRead`. It tracks a `done: bool` flag; once it sees a frame with `END_STREAM` (0x02), all subsequent calls return `Ok(None)`. Compressed frames are decompressed inline via the `CompressionRegistry`.

`EnvelopeEncoder` implements `tokio_util::codec::Encoder<Bytes>` and applies compression when the `CompressionPolicy` allows it for the given payload size. End-stream frames are never compressed.

Sources: [connectrpc/src/envelope.rs:139-298]()

---

## Codec Format Selection

`CodecFormat` is a two-variant enum:

```rust
// connectrpc/src/codec.rs:104-111
pub enum CodecFormat {
    Proto,  // binary protobuf via buffa::Message
    Json,   // JSON via serde_json
}
```

### Encoding Functions

Two stateless codec types (`ProtoCodec`, `JsonCodec`) provide `encode`/`decode` methods. The proto path delegates to `buffa::Message::encode_to_bytes` and `decode_from_slice`. The JSON path uses `serde_json`.

```rust
// connectrpc/src/codec.rs:38-58
pub fn encode_proto<M: Message>(message: &M) -> Result<Bytes, ConnectError> {
    Ok(message.encode_to_bytes())
}
pub fn encode_json<M: Serialize>(message: &M) -> Result<Bytes, ConnectError> {
    serde_json::to_vec(message).map(Bytes::from)
        .map_err(|e| ConnectError::internal(...))
}
```

Sources: [connectrpc/src/codec.rs:38-101]()

### Format-to-Content-Type Mapping

`CodecFormat` also owns the content-type string constants and the reverse parse:

| Situation | Proto | JSON |
|---|---|---|
| Unary | `application/proto` | `application/json` |
| Connect streaming | `application/connect+proto` | `application/connect+json` |
| From query param `encoding=` | `"proto"` | `"json"` |

Sources: [connectrpc/src/codec.rs:14-173]()

---

## Compression

### Architecture

Compression is pluggable. `CompressionProvider` is a trait that every algorithm implements:

```rust
// connectrpc/src/compression.rs:94-116
pub trait CompressionProvider: Send + Sync + 'static {
    fn name(&self) -> &'static str;
    fn compress(&self, data: &[u8]) -> Result<Bytes, ConnectError>;
    fn decompressor<'a>(&self, data: &'a [u8])
        -> Result<Box<dyn std::io::Read + 'a>, ConnectError>;
    // decompress_with_limit has a default impl using Read::take
}
```

`CompressionRegistry` maps encoding names to `Arc<dyn CompressionProvider>` and caches the sorted `Accept-Encoding` header string at registration time (not per-request).

Sources: [connectrpc/src/compression.rs:169-260]()

### Built-in Providers

| Provider | Feature flag | Crate | Default level | Notes |
|---|---|---|---|---|
| `GzipProvider` | `gzip` (default) | `flate2` with `zlib-rs` backend | 1 (fastest) | Pools `Compress`/`Decompress` objects (up to 64 per provider) to avoid ~200 KB allocation per request |
| `ZstdProvider` | `zstd` (default) | `zstd` | 3 | Pools `Compressor`; decompressor uses streaming `zstd::Decoder` to handle any compression ratio |

Sources: [connectrpc/src/compression.rs:581-1062]()

The `CompressionRegistry::default()` automatically registers all feature-enabled providers. When the `streaming` feature is enabled, providers additionally implement `StreamingCompressionProvider` with `compress_stream`/`decompress_stream` returning `BoxedAsyncRead` (via `async-compression`).

### Compression Policy

`CompressionPolicy` controls when compression fires:

```rust
// connectrpc/src/compression.rs:475-527
pub struct CompressionPolicy { enabled: bool, min_size: usize }
// Default: enabled=true, min_size=1024 bytes
pub fn should_compress(&self, message_size: usize) -> bool {
    self.enabled && message_size >= self.min_size
}
```

The 1 KiB default matches gRPC-Java. Per-call overrides (`Some(true)` / `Some(false)`) are applied via `with_override`.

Sources: [connectrpc/src/compression.rs:474-543]()

### Encoding Negotiation

`CompressionRegistry::negotiate_encoding` implements the Connect spec rule: use the client's `Accept-Encoding` preference list (most preferred first, no quality values); if the client omits it, the server may assume the client accepts the same encoding used in the request.

Sources: [connectrpc/src/compression.rs:263-301]()

### Security

All decompression goes through `decompress_with_limit`, never a free `decompress`. The default trait implementation uses `Read::take(max_size + 1)` structurally — custom providers get compression-bomb protection for free. Zero-length bodies short-circuit before calling the decoder (zstd rejects empty input as an incomplete frame).

Sources: [connectrpc/src/compression.rs:127-146, 330-344]()

---

## Client-Side Transport

Generated service clients are generic over `T: ClientTransport`, defined in `client/mod.rs`. The trait requires only one method:

```rust
// connectrpc/src/client/mod.rs:160-171
pub trait ClientTransport: Clone + Send + Sync + 'static {
    type ResponseBody: Body<Data = Bytes> + Send + 'static;
    type Error: std::error::Error + Send + Sync + 'static;
    fn send(&self, request: Request<ClientBody>) -> BoxFuture<'static, Result<...>>;
}
```

`ServiceTransport<S>` adapts any `tower::Service` to `ClientTransport` using `ServiceExt::oneshot`, which correctly calls `poll_ready` before `call`.

Sources: [connectrpc/src/client/mod.rs:156-228]()

### Two Concrete Transports

```text
┌──────────────────────────────────────────────────────┐
│  Generated FooServiceClient<T: ClientTransport>      │
└──────────┬──────────────────────────────┬────────────┘
           │                              │
    ┌──────▼──────┐               ┌───────▼────────────┐
    │  HttpClient  │               │ SharedHttp2Connection│
    │ (hyper_util) │               │ (raw h2, honest     │
    │ HTTP/1.1+h2  │               │  poll_ready)        │
    │ ALPN / pool  │               │ Reconnect wrapper   │
    └─────────────┘               └────────────────────┘
```

#### `HttpClient` (HTTP/1.1 + HTTP/2, ALPN)

Wraps `hyper_util::client::legacy::Client`. Supports `http://` and `https://` URIs. ALPN negotiates h2 or h/1.1 on TLS connections. The `poll_ready` implementation is always `Ready(Ok(()))` — the client manages pooling and queuing internally.

- **Limitation:** For HTTP/2, the pool holds exactly one shared connection. All concurrent requests contend on h2's `Mutex<Inner>`, creating a throughput ceiling around 30–40k req/s.
- **Use when:** Connect over HTTP/1.1; unknown or mixed protocol; ALPN-based TLS.

Sources: [connectrpc/src/client/mod.rs:240-557]()

Constructors:

| Method | URI scheme | HTTP version |
|---|---|---|
| `HttpClient::plaintext()` | `http://` | H/1.1 + H/2 |
| `HttpClient::plaintext_http2_only()` | `http://` | H/2 only |
| `HttpClient::with_tls(rustls_config)` | `https://` | H/2 + H/1.1 (ALPN) |

#### `Http2Connection` / `SharedHttp2Connection` (HTTP/2 only)

A single raw h2 connection with a `Reconnect` state machine. `poll_ready` reflects actual connection state (`Idle` / `Connecting` / `Connected`). This makes it composable with `tower::balance::p2c::Balance` and `tower::load::PendingRequests`.

```rust
// connectrpc/src/client/http2.rs:204-208
pub struct Http2Connection {
    inner: Reconnect<MakeSendRequest>,
}
```

`SharedHttp2Connection` wraps it in `Arc` so multiple callers can clone a handle cheaply. For high-throughput gRPC, create N connections and wrap them in a p2c balancer to reduce per-connection h2 mutex contention by ~1/N.

Sources: [connectrpc/src/client/http2.rs:167-226]()

Constructors: `connect_plaintext`, `lazy_plaintext`, `connect_tls`, `lazy_tls`, `lazy_with_connector`, `lazy_unix` (Unix domain sockets, non-Windows).

### `ClientConfig` — Per-Client Settings

`ClientConfig` bundles the protocol, codec format, compression registry, and policy into one struct passed alongside the transport:

```rust
// connectrpc/src/client/mod.rs:577-605
pub struct ClientConfig {
    base_uri: Uri,
    protocol: Protocol,         // Connect / Grpc / GrpcWeb
    codec_format: CodecFormat,  // Proto / Json
    compression: CompressionRegistry,
    request_compression: Option<String>,  // e.g. "gzip"
    compression_policy: CompressionPolicy,
    default_timeout: Option<Duration>,
    default_max_message_size: Option<usize>,
    default_headers: http::HeaderMap,
}
```

Sources: [connectrpc/src/client/mod.rs:559-750]()

### Tower Middleware Composition

Both transports are `tower::Service`s. Wrap them with `ServiceBuilder` before passing to `ServiceTransport`:

```rust
let conn = Http2Connection::connect_plaintext(uri).await?.shared(1024);
let stacked = ServiceBuilder::new()
    .layer(TimeoutLayer::new(Duration::from_secs(30)))
    .service(conn);
let client = GreetServiceClient::new(
    connectrpc::client::ServiceTransport::new(stacked),
    config,
);
```

Sources: [connectrpc/src/client/mod.rs:82-100]()

---

## gRPC Status Encoding

For gRPC and gRPC-Web, errors are carried in the `grpc-status-details-bin` trailer as a hand-encoded `google.rpc.Status` protobuf message. The crate encodes this directly using `buffa`'s low-level `Tag`/`WireType`/`encode_varint` primitives rather than generating a `.proto` type, avoiding a dependency on the generated type.

Sources: [connectrpc/src/grpc_status.rs:1-43]()

---

## Adding a New Codec or Compression Algorithm

### New codec format

1. Add a variant to `CodecFormat` in `connectrpc/src/codec.rs:104-111`.
2. Extend `CodecFormat::from_content_type`, `content_type()`, and `streaming_content_type()` with the new MIME type.
3. Extend `Protocol::detect_from_content_type` and `Protocol::response_content_type` in `connectrpc/src/protocol.rs` for each applicable protocol.
4. Implement the encode/decode functions and wire them into the server handler and generated client call sites.

### New compression algorithm

1. Implement `CompressionProvider` (three required methods: `name`, `compress`, `decompressor`). The default `decompress_with_limit` via `Read::take` is safe — override only for performance.
2. Optionally implement `StreamingCompressionProvider` (two methods: `compress_stream`, `decompress_stream`) when the `streaming` feature is relevant.
3. Add a Cargo feature flag gating the provider on the appropriate optional dependency.
4. Register in `CompressionRegistry::default()` behind the feature flag, following the existing `GzipProvider` / `ZstdProvider` pattern in `connectrpc/src/compression.rs:546-575`.

No changes to the protocol or envelope layers are needed — `CompressionRegistry` is injected into `EnvelopeDecoder` and `EnvelopeEncoder` at construction time, and negotiation is fully data-driven by the registered provider names.

Sources: [connectrpc/src/compression.rs:94-167, 193-240, 546-575]()

---

## Summary

The request path through the wire stack is: `Content-Type` → `Protocol::detect` (identifies protocol + codec + streaming flag) → `EnvelopeDecoder` (for streaming: strips the 5-byte header, decompresses if needed) → codec decode (proto or JSON) → handler. The response path is the mirror: codec encode → `EnvelopeEncoder` (for streaming: compresses if policy allows, writes 5-byte header) → protocol-specific trailer delivery. The client stack is a `ClientConfig` (protocol + codec + compression settings) paired with a `ClientTransport` (either `HttpClient` for HTTP/1.1+h2 with ALPN, or `SharedHttp2Connection` for gRPC with honest Tower readiness semantics). All compression is enforced through size limits, making the codebase structurally resistant to compression-bomb attacks.

Sources: [connectrpc/src/compression.rs:127-146]()
