# Interceptors & Spec Metadata

> RPC-level interceptors as a typed alternative to Tower middleware: Interceptor trait, Next/NextStream continuations, unary vs. streaming intercept surfaces, registration order, and zero-cost opt-out. Also covers Spec (per-method static metadata) and StreamType, which interceptors use to label spans and gate behaviour without re-parsing URLs.

- Repository: anthropics/connect-rust
- GitHub: https://github.com/anthropics/connect-rust
- Human wiki: https://grok-wiki.com/public/wiki/anthropics-connect-rust-abe117693c52
- Complete Markdown: https://grok-wiki.com/public/wiki/anthropics-connect-rust-abe117693c52/llms-full.txt

## Source Files

- `connectrpc/src/interceptor.rs`
- `connectrpc/src/spec.rs`
- `connectrpc/src/payload.rs`
- `connectrpc/src/service.rs`
- `connectrpc/src/error.rs`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [connectrpc/src/interceptor.rs](connectrpc/src/interceptor.rs)
- [connectrpc/src/spec.rs](connectrpc/src/spec.rs)
- [connectrpc/src/payload.rs](connectrpc/src/payload.rs)
- [connectrpc/src/service.rs](connectrpc/src/service.rs)
- [connectrpc/src/response.rs](connectrpc/src/response.rs)
- [connectrpc/src/dispatcher.rs](connectrpc/src/dispatcher.rs)
</details>

# Interceptors & Spec Metadata

Interceptors are the RPC-level cross-cutting concern layer in `connect-rust`. They wrap a single RPC *after* envelope decoding, decompression, and header parsing, and *before* the handler runs — giving you typed access to request and response bodies, headers, extensions, and the per-method `Spec`, without dropping to raw Tower middleware. `Spec` is the complementary type: a `Copy`, `'static` struct emitted by code generation that names the method, its stream shape, and its idempotency contract. Interceptors read it to label spans, gate behaviour, or decide retry eligibility without re-parsing the request URL.

This page covers the `Interceptor` trait, its two intercept surfaces (unary and streaming), the `Next`/`NextStream` continuations, chain ordering and short-circuiting, the `Payload` lazy-decode contract that keeps interceptors cheap, registration via `ConnectRpcService::with_interceptor`, and the `Spec`/`StreamType`/`IdempotencyLevel` static-metadata types.

---

## The `Interceptor` Trait

```rust
// connectrpc/src/interceptor.rs:116-189
#[async_trait::async_trait]
pub trait Interceptor: Send + Sync + 'static {
    async fn intercept_unary(
        &self,
        req: UnaryRequest,
        next: Next<'_>,
    ) -> Result<UnaryResponse, ConnectError> {
        next.run(req).await  // default: passthrough
    }

    async fn intercept_streaming(
        &self,
        req: StreamRequest,
        inbound: PayloadStream,
        next: NextStream<'_>,
    ) -> Result<StreamResponse, ConnectError> {
        next.run(req, inbound).await  // default: passthrough
    }
}
```

Both methods have a **passthrough default**. An interceptor that only cares about streaming RPCs (or only about unary) can implement just the relevant method and remain forwards-compatible as the framework grows.

The trait is annotated with `async_trait`. To avoid a direct `async-trait` dependency, the crate re-exports the macro as `connectrpc::async_trait`.

Sources: [connectrpc/src/interceptor.rs:56-66, 115-189]()

---

## Two Intercept Surfaces

### Unary: `intercept_unary`

Receives a `UnaryRequest` (which bundles `ctx: RequestContext` and `payload: Payload`) and a `Next<'_>` continuation. Returns `Result<UnaryResponse, ConnectError>`.

- Mutate `req.ctx.headers` or `req.ctx.extensions` before calling `next.run(req)` to propagate changes to the handler.
- Call `req.payload.set_message(m)` to replace the request body.
- Read the response on the way out and call `resp.with_header(...)` or inspect `resp.body`.

```rust
// connectrpc/src/interceptor.rs:84-113 (doc example)
#[connectrpc::async_trait]
impl Interceptor for LoggingInterceptor {
    async fn intercept_unary(
        &self,
        req: UnaryRequest,
        next: Next<'_>,
    ) -> Result<UnaryResponse, ConnectError> {
        let path = req.ctx.path()
            .expect("dispatch sets path before interceptors run")
            .to_owned();
        tracing::info!(%path, "rpc start");
        let resp = next.run(req).await;
        tracing::info!(%path, ok = resp.is_ok(), "rpc end");
        resp
    }
}
```

`UnaryResponse` is a type alias for `Response<Payload>`, so the same lazy-decode contract applies on the way out.

Sources: [connectrpc/src/interceptor.rs:295-341, 340]()

### Streaming: `intercept_streaming`

Called **once at stream establishment**, before any messages flow. It receives a `StreamRequest` (ctx only, no payload), a `PayloadStream` for the inbound direction, and a `NextStream<'_>`. Returns `Result<StreamResponse, ConnectError>`.

All three streaming shapes — server-streaming, client-streaming, and bidi — route through this single method. The difference is cardinality:

| Shape | Inbound items | Outbound items |
|---|---|---|
| Server-streaming | 1 | N |
| Client-streaming | N | 1 |
| Bidi-streaming | N | N |

To branch on cardinality, read `req.ctx.spec().map(|s| s.stream_type)`.

To observe or mutate the inbound stream, wrap `inbound` with a `Stream` adapter before passing it to `next.run`. To observe or mutate the outbound stream, wrap `resp.body` after calling `next.run`:

```rust
// connectrpc/src/interceptor.rs:1609-1626 (from test, adapted)
async fn intercept_streaming(
    &self,
    req: StreamRequest,
    inbound: PayloadStream,
    next: NextStream<'_>,
) -> Result<StreamResponse, ConnectError> {
    let wrapped = Box::pin(inbound.map(|item| {
        item.map(|mut payload| { payload.set_message(/* ... */); payload })
    }));
    next.run(req, wrapped).await
}
```

Cross-stream coordination (outbound depends on inbound state) requires shared state between the two adapters, typically an `Arc<Mutex<..>>` captured by both closures.

Sources: [connectrpc/src/interceptor.rs:135-188, 449-499]()

---

## `Next` and `NextStream` Continuations

`Next<'a>` and `NextStream<'a>` are the handles an interceptor calls to advance the chain. Both are **consume-once**: calling `run` on them is the only legal move, and not calling it short-circuits the rest of the chain (neither inner interceptors nor the handler run).

```rust
// connectrpc/src/interceptor.rs:240-275
pub struct Next<'a> {
    rest: &'a [Arc<dyn Interceptor>],
    terminal: &'a (dyn UnaryTerminal + 'a),
}

impl<'a> Next<'a> {
    pub async fn run(self, req: UnaryRequest) -> Result<UnaryResponse, ConnectError> {
        match self.rest.split_first() {
            Some((head, tail)) => head.intercept_unary(req, Next { rest: tail, .. }).await,
            None => self.terminal.call(req).await,
        }
    }
}
```

The terminal is the dispatch path itself (`DispatchTerminal`), which hands the `Payload` — not raw bytes — to the dispatcher so a handler can call `take_message()` and reuse an interceptor's cached decode.

Sources: [connectrpc/src/interceptor.rs:235-283, 501-555, 926-944]()

---

## Chain Ordering

```text
request ──▶ interceptor[0] ──▶ interceptor[1] ──▶ handler
                 │                  │                 │
response ◀───────┴──────────◀───────┴────────◀───────┘
```

The **first interceptor registered is the outermost**: first to run on the way in, last on the way out. This matches `connect-go`'s `WithInterceptors` ordering. The test suite verifies it explicitly:

```rust
// connectrpc/src/interceptor.rs:1014-1039
// chain: [Tagger("a"), Tagger("b"), Tagger("c")]
// way in:  trace == ["a", "b", "c"]
// way out: x-trace headers == ["c-out", "b-out", "a-out"]
```

Sources: [connectrpc/src/interceptor.rs:29-35, 1013-1040]()

---

## Short-Circuiting

An interceptor that returns `Err` — or returns `Ok` without calling `next.run` — terminates the chain immediately. Inner interceptors and the handler do not run.

Errors carry **response headers** via `ConnectError::with_headers`. These are preserved through the chain and reach the dispatch path's protocol-aware error renderers (`error_response`, `grpc_error_response`), which put them on the wire. This is the mechanism for auth interceptors to attach diagnostic headers (e.g. `x-deny-policy`) to a denial response.

```rust
// connectrpc/src/interceptor.rs:1054-1058
Err(ConnectError::permission_denied("nope")
    .with_headers(headers))
```

Sources: [connectrpc/src/interceptor.rs:1042-1078, 1083-1153]()

---

## Zero-Cost Opt-Out

When no interceptors are registered, the dispatch path makes a single `is_empty` check and delegates directly to the dispatcher — no `UnaryRequest` is constructed, no `Next` is built, no chain machinery runs:

```rust
// connectrpc/src/interceptor.rs:903-915
pub(crate) async fn call_unary_intercepted<D: crate::Dispatcher>(
    dispatcher: &D,
    interceptors: &[Arc<dyn Interceptor>],
    ...,
) -> Result<EncodedResponse, ConnectError> {
    if interceptors.is_empty() {
        return dispatcher.call_unary(path, ctx, Payload::new(body, format), format).await;
    }
    // ... chain machinery only when interceptors are present
}
```

The same pattern applies to all three streaming variants. A test pins the zero-allocation property by pointer-equality on the echoed body bytes.

Sources: [connectrpc/src/interceptor.rs:895-924, 648-673]()

---

## `Payload`: Lazy Decode

`Payload` is the body type seen by interceptors on both the request and response side. It holds the wire bytes (`Bytes`, reference-counted) and decodes to a typed message on first access. An interceptor that never inspects the message body pays only a struct construction — no decode, no allocation beyond the `Bytes` refcount.

| Method | Cost | Notes |
|---|---|---|
| `payload.bytes()` | Free | Raw wire bytes, always available |
| `payload.message::<M>()` | Decode once, then cache | Works for Proto and JSON wires |
| `payload.view::<V>()` | Zero-copy Proto view | Proto wires only; re-encodes if replaced |
| `payload.set_message(m)` | Box allocation | Replacement takes priority for all reads |
| `payload.take_message::<M>()` | Move from cache or fresh decode | Consuming; handler uses this to avoid re-decode |
| `payload.encoded()` | `Bytes` refcount clone or re-encode | What the dispatch path sends on the wire |

`Payload` is intentionally not `Clone`: a clone would either drop the decode cache (surprising) or duplicate it (defeating laziness). Pass by reference or move through the call chain.

The decode cache is a `OnceLock<Box<dyn AnyMessage>>` and is thread-safe; two threads racing on `message::<M>()` both decode, but only one `set` wins and the loser discards.

Sources: [connectrpc/src/payload.rs:1-20, 99-363]()

---

## Registering Interceptors

Use `ConnectRpcService::with_interceptor`:

```rust
// connectrpc/src/service.rs:1219-1220
pub fn with_interceptor(self, interceptor: impl Interceptor) -> Self {
    self.with_interceptor_arc(Arc::new(interceptor))
}
```

Internally the chain is stored as `Arc<[Arc<dyn Interceptor>]>`. `with_interceptor_arc` is provided for sharing a single interceptor instance (with its state — a connection pool, a token cache) across multiple `ConnectRpcService`s without duplicating it.

Calls compose: each successive `with_interceptor` pushes to the end of the slice, making the last registered the innermost.

Sources: [connectrpc/src/service.rs:1200-1240]()

---

## Closure Shortcuts

For lightweight interceptors that don't need a named type, two factory functions produce `impl Interceptor` from closures:

```rust
// connectrpc/src/interceptor.rs:206-233
let timing = unary_interceptor(|req, next| Box::pin(async move {
    let started = std::time::Instant::now();
    let resp = next.run(req).await;
    tracing::debug!(elapsed = ?started.elapsed(), "rpc");
    resp
}));

// connectrpc/src/interceptor.rs:463-499
let logging = streaming_interceptor(|req, inbound, next| Box::pin(async move {
    tracing::info!(path = ?req.ctx.path(), "stream open");
    next.run(req, inbound).await
}));
```

The `Box::pin` boilerplate is unavoidable (the trait method returns a boxed future through `async_trait`), but the closure body is identical to what an `impl Interceptor` block would contain.

Sources: [connectrpc/src/interceptor.rs:191-233, 449-499]()

---

## Unit-Testing Interceptors

Two public helpers let you drive a chain in unit tests without a TCP listener or Tower service:

```rust
// Unary
let resp = connectrpc::interceptor::run_chain(
    &chain,
    my_req,
    |req| async move { Ok(/* handler stub */) },
).await?;

// Streaming
let resp = connectrpc::interceptor::run_chain_streaming(
    &chain,
    my_stream_req,
    my_inbound_stream,
    |req, inbound| async move { Ok(/* handler stub */) },
).await?;
```

Both helpers accept a closure as the terminal, so you can assert what the handler would see (headers mutated, body replaced) without a real dispatcher.

Sources: [connectrpc/src/interceptor.rs:849-893, 571-624]()

---

## `Spec`: Per-Method Static Metadata

`Spec` is a `Copy`, `'static` struct emitted by code generation as a `pub const` per method. It is threaded through to handlers and interceptors via `RequestContext::spec()`.

```rust
// connectrpc/src/spec.rs:125-153
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct Spec {
    pub procedure: &'static str,   // "/package.Service/Method"
    pub stream_type: StreamType,
    pub origin: SpecOrigin,        // Server or Client
    pub idempotency_level: IdempotencyLevel,
}
```

`Spec` is `#[non_exhaustive]` so future fields can be added without a breaking change. Construct via `Spec::server` or `Spec::client` (both `const fn`); chain `with_idempotency_level` in `const` position so the constant lives in `.rodata`.

```rust
// connectrpc/src/spec.rs:284-292 (test, shows const construction)
const SPEC: Spec = Spec::server("/pkg.Greet/Say", StreamType::Unary)
    .with_idempotency_level(IdempotencyLevel::NoSideEffects);
```

### Accessors

| Method | Returns | Example |
|---|---|---|
| `spec.procedure` | `&'static str` | `"/pkg.Greet/Say"` |
| `spec.service()` | `&'static str` | `"pkg.Greet"` |
| `spec.method()` | `&'static str` | `"Say"` |
| `spec.stream_type` | `StreamType` | `StreamType::Unary` |
| `spec.origin` | `SpecOrigin` | `SpecOrigin::Server` |
| `spec.idempotency_level` | `IdempotencyLevel` | `IdempotencyLevel::NoSideEffects` |

In debug builds, `Spec::server`/`Spec::client` assert that `procedure` starts with `/` and contains a service/method separator, turning malformed paths into compile-time panics for `const` specs.

Sources: [connectrpc/src/spec.rs:1-265]()

---

## `StreamType`

`StreamType` is the interceptor-facing equivalent of `MethodKind` (the routing-table type). It uses `connect-go` naming so cross-runtime interceptor logic ports cleanly.

```rust
// connectrpc/src/spec.rs:30-39
pub enum StreamType {
    Unary,        // 1 request, 1 response
    ClientStream, // N requests, 1 response
    ServerStream, // 1 request, N responses
    BidiStream,   // N requests, N responses
}
```

`From<MethodKind>` and `From<StreamType>` conversions exist in both directions. Prefer `StreamType` in code that consumes a `Spec`; use `MethodKind` when registering routes.

Sources: [connectrpc/src/spec.rs:19-61]()

---

## `IdempotencyLevel`

Derived from the proto `option idempotency_level` field. Interceptors use it to make retry or GET-eligibility decisions without re-examining the proto schema:

```rust
// connectrpc/src/spec.rs:71-81
pub enum IdempotencyLevel {
    Unknown,        // proto default — no guarantee
    NoSideEffects,  // read-only, safe to retry or GET
    Idempotent,     // may have side effects, but repeatable
}
```

`NoSideEffects` is the only value that makes a call GET-eligible under the Connect protocol. `Idempotent` is safe to retry but not GET-eligible — this distinction is captured by `MethodDescriptor::idempotent` (a derived boolean) versus `Spec::idempotency_level` (the full three-valued enum).

Sources: [connectrpc/src/spec.rs:63-81, 148-152]()

---

## `SpecOrigin`

Records whether a `Spec` constant was emitted by a generated server dispatcher (`FooServiceServer<T>`) or a generated client (`FooServiceClient<T>`). An interceptor registered on both sides can read this to open the right span kind or inject trace-context headers only on the client side:

```rust
// connectrpc/src/spec.rs:101-107
pub enum SpecOrigin { Server, Client }
```

Sources: [connectrpc/src/spec.rs:83-107]()

---

## Where `Spec` Appears at Runtime

`RequestContext::spec()` returns `Option<Spec>`:

- `Some(..)` — for generated `FooServiceServer<T>` dispatchers and for `Router` routes registered through the generated `register()` (which chains `Router::with_spec` per route).
- `None` — for manual `route_*` registrations without a `Router::with_spec` call.

`ctx.path()` is always present when constructed by the dispatch path (including dynamic `Router` routes that have no `Spec`). Code that must label every request — auth interceptors, span builders, rate limiters — should read `path()` first and fall back to `spec()` for richer metadata when available.

Sources: [connectrpc/src/response.rs:190-224](), [connectrpc/src/dispatcher.rs:55]()

---

## Design Note: Stream-Shaped, Not Connection-Shaped

The streaming interceptor receives and returns Rust `Stream`s rather than a stateful `conn.Send()`/`conn.Receive()` pair (as in `connect-go`). This matches the Rust ecosystem model (`tower`, `tonic`, `axum` all work with `Stream`-shaped bodies) and avoids the channel-intermediary and pump-task overhead that a connection-wrapper would require. The expressive power is equivalent: wrapping the inbound or outbound stream with an adapter gives full per-message observe/mutate capability.

Sources: [connectrpc/src/interceptor.rs:17-26]()

---

## Summary

Interceptors in `connect-rust` provide typed, post-decode hooks for every RPC without paying any cost when none are registered. The `Interceptor` trait's two surfaces — `intercept_unary` and `intercept_streaming` — cover all four message-flow shapes through a uniform `Stream`-based API. `Spec` supplies the per-method static facts (`procedure`, `stream_type`, `idempotency_level`, `origin`) that interceptors need to label spans or gate behavior, without re-parsing the request URL. Together they form the recommended cross-cutting concern layer, sitting between the protocol decoder and the handler, with `Payload`'s lazy-decode design ensuring that interceptors that never touch message bodies are effectively free.

Sources: [connectrpc/src/interceptor.rs:1-41]()
