# Handlers, Router & ConnectRpcService

> The server-side dispatch path: implementing the generated FooService handler trait, registering methods on Router, wrapping the router in ConnectRpcService (the Tower service), and mounting it with Axum or raw Hyper. Covers unary and streaming handler shapes.

- Repository: anthropics/connect-rust
- GitHub: https://github.com/anthropics/connect-rust
- Human wiki: https://grok-wiki.com/public/wiki/anthropics-connect-rust-abe117693c52
- Complete Markdown: https://grok-wiki.com/public/wiki/anthropics-connect-rust-abe117693c52/llms-full.txt

## Source Files

- `connectrpc/src/handler.rs`
- `connectrpc/src/router.rs`
- `connectrpc/src/service.rs`
- `connectrpc/src/dispatcher.rs`
- `connectrpc/src/axum.rs`
- `connectrpc/src/response.rs`
- `examples/eliza/src`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [connectrpc/src/handler.rs](connectrpc/src/handler.rs)
- [connectrpc/src/router.rs](connectrpc/src/router.rs)
- [connectrpc/src/service.rs](connectrpc/src/service.rs)
- [connectrpc/src/dispatcher.rs](connectrpc/src/dispatcher.rs)
- [connectrpc/src/axum.rs](connectrpc/src/axum.rs)
- [connectrpc/src/response.rs](connectrpc/src/response.rs)
- [examples/eliza/src/main.rs](examples/eliza/src/main.rs)
- [examples/eliza/src/generated/connect/connectrpc.eliza.v1.eliza.__connect.rs](examples/eliza/src/generated/connect/connectrpc.eliza.v1.eliza.__connect.rs)
</details>

# Handlers, Router & ConnectRpcService

This page covers the server-side dispatch path: from implementing a generated `FooService` trait to mounting the router as a Tower service inside an Axum or Hyper application. It walks through the three conceptual layers — **handler traits**, **Router registration**, and **ConnectRpcService** (the Tower service) — and shows where the generated codegen glue and the hand-written path differ.

Understanding this path matters because the architecture makes a deliberate split: the *type-aware* layer (handler traits, request decode, response encode) is handled at compile time, while the *dispatch* layer (method-path lookup, protocol negotiation, envelope framing, interceptors, limits) happens uniformly inside `ConnectRpcService`. The two paths interact through the `Dispatcher` trait.

---

## Architecture Overview

```text
HTTP request
    │
    ▼
┌─────────────────────────────┐
│    ConnectRpcService<D>     │  Tower Service — protocol decode, limits,
│   (service.rs:1098)         │  compression, interceptors, deadline, tracing
└──────────┬──────────────────┘
           │ Dispatcher::lookup(path)  →  MethodDescriptor (kind, idempotent, Spec)
           │ Dispatcher::call_*(...)
           ▼
┌──────────────────────────────────────┐
│          D: Dispatcher               │
│                                      │
│  ┌──────────────┐  ┌──────────────┐  │
│  │    Router    │  │ FooServiceServer│ │
│  │  (HashMap)   │  │  (monomorphic)│  │
│  └──────┬───────┘  └──────┬───────┘  │
└─────────┼─────────────────┼──────────┘
          │                 │
          ▼ ErasedHandler   ▼ direct match
     Handler<Req, Res>   FooService trait impl
     (handler.rs)        (user code / Arc<T>)
```

Sources: [connectrpc/src/dispatcher.rs:1-19](), [connectrpc/src/service.rs:1098-1109]()

---

## Handler Traits

### The public surface

The library defines four handler *kinds*, mirroring the four RPC streaming shapes:

| Kind | Trait | Signature |
|---|---|---|
| Unary | `Handler<Req, Res>` | `fn call(&self, ctx, Req) -> BoxFuture<ServiceResult<Body>>` |
| Server streaming | `StreamingHandler<Req, Res>` | `fn call(&self, ctx, Req) -> BoxFuture<ServiceResult<ServiceStream<Item>>>` |
| Client streaming | `ClientStreamingHandler<Req, Res>` | `fn call(&self, ctx, ServiceStream<Req>) -> BoxFuture<ServiceResult<Body>>` |
| Bidi streaming | `BidiStreamingHandler<Req, Res>` | `fn call(&self, ctx, ServiceStream<Req>) -> BoxFuture<ServiceResult<ServiceStream<Item>>>` |

Each trait has a parallel *view* variant (`ViewHandler`, `ViewStreamingHandler`, `ViewClientStreamingHandler`, `ViewBidiStreamingHandler`) that receives an `OwnedView<ReqView>` instead of an owned `Req`. Views give zero-copy `&str` / `&[u8]` access into the decoded buffer without an extra allocation.

Sources: [connectrpc/src/handler.rs:154-170](), [connectrpc/src/handler.rs:276-297](), [connectrpc/src/handler.rs:415-429](), [connectrpc/src/handler.rs:537-554]()

### Function-wrapper helpers

You rarely implement these traits by hand. Four `*_handler_fn` helpers wrap closures:

```rust
// Unary: closure takes (ctx, owned Req)
let h = handler_fn(|ctx: RequestContext, req: SayRequest| async move {
    Response::ok(SayResponse { sentence: "hello".into(), ..Default::default() })
});

// Server-streaming: closure returns a stream
let h = streaming_handler_fn(|ctx, req: IntroduceRequest| async move {
    Response::stream_ok(futures::stream::iter([
        Ok(IntroduceResponse { sentence: "Hi!".into(), ..Default::default() })
    ]))
});

// View (zero-copy): closure takes OwnedView<ReqView>
let h = view_handler_fn(|ctx, req: OwnedView<SayRequestView<'static>>, format| async move {
    let reply = lookup_reply(req.sentence); // req.sentence is &str, no allocation
    Response::ok(SayResponse { sentence: reply, ..Default::default() })?.encode::<SayResponse>(format)
});
```

Sources: [connectrpc/src/handler.rs:172-210](), [connectrpc/src/handler.rs:331-361](), [connectrpc/src/handler.rs:748-755]()

### Response metadata

Handlers return `ServiceResult<Body>` where `Body: Encodable<Res>`. The `Response<B>` wrapper carries both the body and optional response-side metadata via a fluent builder:

```rust
Response::ok(body)                         // body only (most handlers)
Response::ok(body)
    .with_header("x-trace-id", id)         // add a response header
    .with_trailer("grpc-status-bin", bin)  // add a trailer
```

This separates read-only `RequestContext` (passed *in*) from `Response<B>` (returned *out*), avoiding the earlier design where a mutable `Context` was threaded through both directions.

Sources: [connectrpc/src/response.rs:282-310]()

---

## Router

`Router` is the dynamic dispatch path. It maps `"service_name/method_name"` string keys to type-erased handlers stored in a `HashMap`.

### Registration methods

```rust
let router = Router::new()
    // Unary
    .route("pkg.v1.MyService", "DoThing", handler)
    // Idempotent unary — enables Connect GET
    .route_idempotent("pkg.v1.MyService", "GetThing", handler)
    // Server streaming
    .route_server_stream("pkg.v1.MyService", "Watch", streaming_handler)
    // Client streaming
    .route_client_stream("pkg.v1.MyService", "Upload", client_stream_handler)
    // Bidi streaming
    .route_bidi_stream("pkg.v1.MyService", "Chat", bidi_handler)
    // Zero-copy view variants exist for all four shapes
    .route_view("pkg.v1.MyService", "FastRead", view_handler);
```

Each `route_*` call wraps the handler in a type-erasing `*HandlerWrapper` (e.g. `UnaryHandlerWrapper`, `ServerStreamingHandlerWrapper`) and stores it behind `Arc<dyn ErasedHandler>`. This is the *dynamic path*: method lookup at call time goes through a `HashMap` and a trait-object vtable.

Sources: [connectrpc/src/router.rs:114-119](), [connectrpc/src/router.rs:142-209](), [connectrpc/src/router.rs:227-301]()

### Attaching a Spec

`Router::with_spec(spec)` attaches static method metadata to the last-registered route. The generated `FooServiceExt::register` always chains this call so that `RequestContext::spec()` is populated exactly as it would be in a monomorphic `FooServiceServer<T>`:

```rust
const SAY_SPEC: Spec = Spec::server("/eliza.v1.Eliza/Say", StreamType::Unary);
let router = Router::new()
    .route_view_idempotent("eliza.v1.Eliza", "Say", handler)
    .with_spec(SAY_SPEC);   // attaches the Spec so handlers can read ctx.spec()
```

In debug builds, `with_spec` panics if the route is not registered or the idempotency flag disagrees with the `route` vs `route_idempotent` choice; in release builds it silently drops a mismatched spec.

Sources: [connectrpc/src/router.rs:433-483]()

### Merging routers

Multiple `Router` instances can be combined:

```rust
let all = merge_routers([service_a_router, service_b_router]);
```

`merge_routers` extends one `HashMap` with another. Per-route `Spec`s are preserved across the merge.

Sources: [connectrpc/src/router.rs:569-576]()

---

## Generated Service Traits and `FooServiceExt::register`

`connectrpc-codegen` generates two items per service:

1. **`FooService` trait** — the async trait you implement. Methods take `OwnedView<FooRequestView<'static>>` by default (zero-copy) and return `ServiceResult<impl Encodable<Out> + Send + use<Self>>`.
2. **`FooServiceExt` blanket impl** — its single method `register(self: Arc<Self>, router: Router) -> Router` chains all the `route_view*` + `with_spec` calls for you.

```rust
// Generated trait (simplified from the Eliza example)
pub trait ElizaService: Send + Sync + 'static {
    fn say<'a>(&'a self, ctx: RequestContext, request: OwnedSayRequestView)
        -> impl Future<Output = ServiceResult<impl Encodable<SayResponse> + Send + use<'a, Self>>> + Send;

    fn converse(&self, ctx: RequestContext, requests: ServiceStream<OwnedConverseRequestView>)
        -> impl Future<Output = ServiceResult<ServiceStream<impl Encodable<ConverseResponse> + Send + use<Self>>>> + Send;

    fn introduce(&self, ctx: RequestContext, request: OwnedIntroduceRequestView)
        -> impl Future<Output = ServiceResult<ServiceStream<impl Encodable<IntroduceResponse> + Send + use<Self>>>> + Send;
}

// Generated blanket impl on Arc<T: ElizaService>
impl<S: ElizaService> ElizaServiceExt for S {
    fn register(self: Arc<Self>, router: Router) -> Router {
        router
            .route_view_idempotent(ELIZA_SERVICE_SERVICE_NAME, "Say", { /* closure captures Arc<S> */ })
            .with_spec(ELIZA_SERVICE_SAY_SPEC)
            .route_view_bidi_stream(...)
            .with_spec(...)
            // ... one pair per method
    }
}
```

The `use<Self>` precise-capturing clause is significant for streaming methods: it tells the borrow checker the opaque stream does *not* capture `&'a self`, so items must be `'static`. To stream view-encoded data, yield [`PreEncoded`](connectrpc/src/handler.rs:0) (pre-encoded bytes) instead.

Sources: [examples/eliza/src/generated/connect/connectrpc.eliza.v1.eliza.__connect.rs:155-260]()

### Implementing the trait

```rust
struct ElizaServer { stream_delay: Duration }

impl ElizaService for ElizaServer {
    async fn say(&self, _ctx: RequestContext, request: OwnedSayRequestView)
        -> ServiceResult<SayResponse>
    {
        let (reply, _) = eliza::reply(request.sentence); // request.sentence is &str
        Response::ok(SayResponse { sentence: reply, ..Default::default() })
    }

    async fn introduce(&self, _ctx: RequestContext, request: OwnedIntroduceRequestView)
        -> ServiceResult<ServiceStream<IntroduceResponse>>
    {
        let intros = eliza::get_intro_responses(request.name);
        Response::stream_ok(futures::stream::iter(intros.into_iter().map(|s| {
            Ok(IntroduceResponse { sentence: s, ..Default::default() })
        })))
    }

    async fn converse(&self, _ctx: RequestContext, requests: ServiceStream<OwnedConverseRequestView>)
        -> ServiceResult<ServiceStream<ConverseResponse>>
    {
        let stream = futures::stream::unfold(requests, |mut reqs| async move {
            let req = reqs.next().await??; // Option<Result<..>>
            let (reply, end) = eliza::reply(req.sentence);
            Some((Ok(ConverseResponse { sentence: reply, ..Default::default() }), reqs))
        });
        Response::stream_ok(stream)
    }
}
```

Sources: [examples/eliza/src/main.rs:66-159]()

### Wiring it up

```rust
let service = Arc::new(ElizaServer::new(args.stream_delay));
let router = service.register(ConnectRouter::new());  // chains all route_view* calls
```

Sources: [examples/eliza/src/main.rs:207-209]()

---

## Dispatcher Trait

`ConnectRpcService` is generic over `D: Dispatcher`, not `Router` directly. This allows both the dynamic `Router` and codegen-emitted monomorphic dispatchers to plug in.

The trait has a two-step call contract:

1. **`lookup(path)`** — returns a `MethodDescriptor` (kind, idempotent flag, `Option<Spec>`) or `None` for a 404. The service uses the kind to choose the correct body-processing path (buffer unary body vs. stream bidi body).
2. **`call_unary` / `call_server_streaming` / `call_client_streaming` / `call_bidi_streaming`** — invoked after body setup. Returns a typed future.

The `Dispatcher` contract says that if the path is not found or the kind does not match, the `call_*` methods return an `Unimplemented` error future — they never panic.

```rust
pub trait Dispatcher: Send + Sync + 'static {
    fn lookup(&self, path: &str) -> Option<MethodDescriptor>;
    fn call_unary(&self, path, ctx, request: Payload, format) -> UnaryResult;
    fn call_server_streaming(&self, path, ctx, request: Bytes, format) -> StreamingResult;
    fn call_client_streaming(&self, path, ctx, requests: RequestStream, format) -> UnaryResult;
    fn call_bidi_streaming(&self, path, ctx, requests: RequestStream, format) -> StreamingResult;
}
```

The `Chain` type (from `dispatcher.rs`) lets you compose multiple `Dispatcher` impls so a single `ConnectRpcService` can serve several unrelated services.

Sources: [connectrpc/src/dispatcher.rs:130-209](), [connectrpc/src/dispatcher.rs:238-245]()

---

## ConnectRpcService

`ConnectRpcService<D>` is the Tower service that ties everything together. It is generic over `D: Dispatcher` (defaulting to `Router`).

### Fields and construction

```rust
pub struct ConnectRpcService<D = Router> {
    dispatcher: Arc<D>,
    limits: Limits,                        // body / message size caps
    compression: Arc<CompressionRegistry>, // available codecs
    compression_policy: CompressionPolicy,
    deadline_policy: DeadlinePolicy,
    interceptors: Arc<[Arc<dyn Interceptor>]>,
}
```

Construct it directly or through the `Router` convenience method:

```rust
// Directly from a dispatcher
let svc = ConnectRpcService::new(router);

// Through Router (axum feature)
let svc = router.into_axum_service();  // ConnectRpcService::new(self)
```

Sources: [connectrpc/src/service.rs:1098-1148](), [connectrpc/src/service.rs:2950-2951]()

### Configuration builder methods

| Method | Purpose |
|---|---|
| `.with_limits(Limits { .. })` | Override 4 MB body / message defaults |
| `.with_compression(registry)` | Register additional codecs (gzip, zstd, etc.) |
| `.with_compression_policy(policy)` | Control threshold for auto-compression |
| `.with_deadline_policy(policy)` | Clamp/default client-asserted timeouts |
| `.with_interceptor(impl Interceptor)` | Append to the unary + streaming interceptor chain |
| `.with_interceptor_arc(Arc<dyn Interceptor>)` | Same, but for shared interceptors |

Default limits are 4 MB on-wire and 4 MB post-decompression (matching tonic and grpc-go defaults). Interceptors are outermost-first: the first registered runs first on the way in and last on the way out, matching connect-go's `WithInterceptors`.

Sources: [connectrpc/src/service.rs:1150-1240]()

### The Tower::Service impl

`ConnectRpcService<D>` implements `tower::Service<Request<B>>` with `Response = Response<ConnectRpcBody>` and `Error = Infallible`. The `call()` method:

1. Extracts per-request context (limits, compression, deadline policy, interceptors) from `self` via cheap `Arc` clones.
2. Conditionally creates a `tracing::debug_span!("connectrpc_request")` only when the subscriber is actually watching that level (avoids `~0.8% CPU` from dead span wrappers in production).
3. Calls `handle_request(...)` which performs: protocol detection → header parse → `dispatcher.lookup(path)` → body collection/streaming → interceptor chain → `dispatcher.call_*` → response encoding → envelope framing.

Sources: [connectrpc/src/service.rs:1321-1385]()

---

## Mounting with Axum

### Plaintext (no TLS)

The `axum` feature adds two convenience methods on `Router`:

```rust
// Option A: use the ConnectRpcService directly as axum's fallback
let app = axum::Router::new()
    .route("/health", axum::routing::get(|| async { "OK" }))
    .fallback_service(connect_router.into_axum_service());

axum::serve(listener, app).await?;

// Option B: get an axum::Router with a catch-all route pre-installed
let axum_router = connect_router.into_axum_router();
// axum_router.into_make_service()...
```

Sources: [connectrpc/src/service.rs:2950-2976](), [examples/eliza/src/main.rs:238-251]()

### TLS / mTLS (`connectrpc::axum::serve_tls`)

`axum::serve` owns only a plain `TcpListener` — it cannot inject peer TLS identity into requests. `connectrpc::axum::serve_tls` fills this gap: it runs the `tokio-rustls` accept loop, captures `PeerAddr` and `PeerCerts` once per connection, stamps them into request extensions, and then forwards to the axum service. Handler code reads the same `ctx.peer_addr()` / `ctx.peer_certs()` accessors regardless of whether the server is the standalone `Server` or an axum app.

```rust
let app = axum::Router::new()
    .route("/health", axum::routing::get(|| async { "OK" }))
    .fallback_service(connect_router.into_axum_service());

connectrpc::axum::serve_tls(listener, app, tls_config)
    .with_graceful_shutdown(async { shutdown_rx.await.ok(); })
    .await?;
```

Key differences from `axum::serve`:
- Accepts a concrete `axum::Router`, not the generic make-service forms.
- `PeerAddr` replaces `ConnectInfo<SocketAddr>`.
- `PeerCerts` is only present when the `ServerConfig` requests client auth and the peer provides a verified chain.
- Bounded TLS handshake timeout (default `DEFAULT_TLS_HANDSHAKE_TIMEOUT`) to prevent slowloris exhaustion.
- Does not install `CatchPanicLayer` — add it yourself if you want panicking handlers to surface as Connect errors.

Sources: [connectrpc/src/axum.rs:1-56](), [connectrpc/src/axum.rs:120-132]()

---

## Mounting with the Standalone Server

The `Server` type owns the accept loop (plaintext or TLS) and delegates to a `ConnectRpcService` internally. For plaintext:

```rust
Server::bind(addr).await?.serve(router).await?;
```

For TLS, pass a `rustls::ServerConfig`:

```rust
Server::bind(addr).await?
    .with_tls(tls_config)
    .serve(router)
    .await?;
```

`Server` automatically wraps the service in `tower_http::catch_panic::CatchPanicLayer` (unlike `axum::serve` and `connectrpc::axum::serve_tls`), so panicking handlers become Connect `Internal` errors rather than dropped connections.

Sources: [examples/eliza/src/main.rs:325-333]()

---

## Dispatch Path Summary

```
HTTP POST /connectrpc.eliza.v1.ElizaService/Say
    │
    ├── ConnectRpcService::call()
    │       extract ctx, limits, interceptors
    │       detect protocol (Connect / gRPC / gRPC-Web)
    │       parse headers → RequestMetadata
    │       dispatcher.lookup("connectrpc.eliza.v1.ElizaService/Say")
    │           → MethodDescriptor { kind: Unary, idempotent: true, spec: Some(SAY_SPEC) }
    │       collect body (bounded by limits.max_request_body_size)
    │       decompress if Content-Encoding set
    │       wrap bytes in Payload (lazy decode cache)
    │
    ├── interceptor chain (outermost first)
    │
    ├── dispatcher.call_unary(path, ctx, payload, format)
    │       Router: HashMap lookup → UnaryViewHandlerWrapper::call_erased()
    │           decode_request_view::<SayRequestView>(bytes, format)
    │           svc.say(ctx, OwnedView<SayRequestView>).await
    │           Response<SayResponse>.encode::<SayResponse>(format) → EncodedResponse
    │
    └── encode HTTP response (Connect / gRPC / gRPC-Web framing)
            return Response<ConnectRpcBody>
```

The two-step `lookup` + `call_*` split means protocol negotiation and body setup finish before any handler-specific code runs, and the `Dispatcher` contract (`None` or `Unimplemented` on miss) keeps the service error-free regardless of routing outcome.

Sources: [connectrpc/src/dispatcher.rs:1-19](), [connectrpc/src/handler.rs:86-143](), [connectrpc/src/router.rs:500-566]()
