# REAPI Server, CAS & Data Flow

> The HTTP/2 gRPC server that implements the REAPI surface (Execution, CAS, ByteStream, ActionCache, Capabilities), how blobs are stored and addressed in the content-addressable store, and the end-to-end data flow from Bazel client through to action output collection.

- Repository: hermeticbuild/actiond
- GitHub: https://github.com/hermeticbuild/actiond
- Human wiki: https://grok-wiki.com/public/wiki/hermeticbuild-actiond-796c0ee40e63
- Complete Markdown: https://grok-wiki.com/public/wiki/hermeticbuild-actiond-796c0ee40e63/llms-full.txt

## Source Files

- `src/grpc_http2_server.zig`
- `src/reapi.zig`
- `src/reapi_dispatch.zig`
- `src/execution_service.zig`
- `src/cache_service.zig`
- `src/bytestream_service.zig`
- `src/action_cache_service.zig`
- `src/capabilities_service.zig`
- `src/cas.zig`
- `src/body_sink.zig`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [src/grpc_http2_server.zig](src/grpc_http2_server.zig)
- [src/reapi.zig](src/reapi.zig)
- [src/reapi_dispatch.zig](src/reapi_dispatch.zig)
- [src/execution_service.zig](src/execution_service.zig)
- [src/cache_service.zig](src/cache_service.zig)
- [src/bytestream_service.zig](src/bytestream_service.zig)
- [src/action_cache_service.zig](src/action_cache_service.zig)
- [src/capabilities_service.zig](src/capabilities_service.zig)
- [src/cas.zig](src/cas.zig)
- [src/body_sink.zig](src/body_sink.zig)
- [src/action_cache.zig](src/action_cache.zig)
</details>

# REAPI Server, CAS & Data Flow

This page describes how actiond implements the [Remote Execution API (REAPI)](https://github.com/bazelbuild/remote-apis): the native HTTP/2 gRPC server that listens for Bazel client connections, the content-addressable store (CAS) that persists blobs and directory trees, and the end-to-end data path from an incoming `Execute` request through action execution and result collection.

The entire stack is written in Zig without generated protobuf stubs or a third-party gRPC runtime. Every layer—HTTP/2 framing, HPACK header compression, protobuf encoding/decoding, CAS storage, and action dispatch—is implemented natively.

---

## HTTP/2 gRPC Server

### Connection lifecycle

`grpc_http2_server.serve` binds a TCP address and enters an accept loop. Each accepted connection gets its own OS thread via `std.Thread.spawn` + `thread.detach()`. The thread validates the HTTP/2 client connection preface, exchanges `SETTINGS` frames, and then processes frames in a read loop.

```zig
// src/grpc_http2_server.zig
pub fn serveDispatcher(...) !void {
    ignoreSigpipe();
    const address = try std.Io.net.IpAddress.parseLiteral(config.listen);
    var listener = try address.listen(io, .{ .reuse_address = true });
    while (true) {
        const stream = listener.accept(io) catch ...;
        const thread = std.Thread.spawn(.{}, connectionThread, .{ io, allocator, dispatcher, stream }) ...;
        thread.detach();
    }
}
```

The server advertises the following settings to each peer: push disabled, 128 max concurrent streams, a 1 GiB inbound initial window, and a 1 MiB max frame size.

Sources: [src/grpc_http2_server.zig:388-420]()

### Frame processing and stream state

Each connection maintains a list of `StreamState` objects keyed by HTTP/2 stream ID. HEADERS frames are decoded via an HPACK decoder (`http2_hpack.Decoder`); the `:path` pseudo-header becomes the gRPC method name stored on the stream. DATA frames accumulate the gRPC request body. When END_STREAM is received the stream is removed from the list and a `ResponseTask` is spawned on a new thread—enabling genuine concurrent responses for multiple in-flight streams on the same connection.

```text
Connection read loop
├── SETTINGS   → ack + apply peer settings
├── PING       → ack
├── WINDOW_UPDATE → update flow-control windows
├── HEADERS    → HPACK decode, extract :path, detect END_STREAM
├── CONTINUATION → append header block fragment
├── DATA       → feed body; if client-streaming, pass directly to WriteGrpcStream
├── RST_STREAM → discard stream
└── GOAWAY     → close connection
```

A `ResponseTracker` (atomic counter) limits the connection to `default_max_concurrent_streams` (128) active response threads at once, spinning until a slot is free.

Sources: [src/grpc_http2_server.zig:454-620]()

### Method routing and streaming kinds

`methodKind` classifies each gRPC method path into one of three patterns:

| Pattern | Methods |
|---|---|
| `unary` | `GetCapabilities`, `FindMissingBlobs`, `BatchUpdateBlobs`, `BatchReadBlobs`, `GetActionResult`, `UpdateActionResult` |
| `server_streaming` | `ByteStream/Read`, `CAS/GetTree`, `Execution/Execute` |
| `client_streaming` | `ByteStream/Write` |

For `server_streaming` methods the server opens the response DATA channel immediately (sends `:status 200` headers) and pushes records via a `body_sink.Writer` as they are produced. For `client_streaming` methods (`ByteStream/Write`) the server eagerly creates a `ByteStreamWriteClientStream` once the method is known and pipes each incoming DATA frame to `WriteGrpcStream.append` without buffering the entire upload in memory.

Sources: [src/grpc_http2_server.zig:712-734](), [src/grpc_http2_server.zig:150-180]()

### Flow control

`FlowControl` tracks per-stream and connection-level send windows using a spin-locked list. `reserveData` blocks the sender thread (spin/sleep) until the peer's window allows the next chunk. When DATA frames arrive the server sends matching `WINDOW_UPDATE` frames for both the connection and the stream to keep the peer's send window full.

Sources: [src/grpc_http2_server.zig:237-310]()

---

## REAPI Dispatch

`reapi_dispatch.Server` is the single dispatch object passed through the gRPC layer. It holds references to the CAS store, the optional action cache store, an optional work-root directory for execution, and `ExecuteOptions`.

```zig
pub const Server = struct {
    store: cas.Store,
    action_cache_store: ?action_cache.Store = null,
    cleanup_store: ?cas.Store = null,
    cleanup_visible_store: ?cas.Store = null,
    cleanup_staged_index: ?*staged_cas_index.Index = null,
    work_root: ?std.Io.Dir = null,
    execution_options: action_executor.ExecuteOptions = .{},
};
```

`handleUnary`, `handleServerStreaming`, `handleServerStreamingResponse`, and `handleClientStreaming` each decode the gRPC-framed protobuf payload, dispatch to the appropriate service function, and encode the response back into a gRPC record.

### Method constants

Method path strings are declared as constants in `reapi_dispatch.zig` and used for string comparison at dispatch time. There are no reflection or codegen steps.

```zig
pub const cas_find_missing_blobs = "/build.bazel.remote.execution.v2.ContentAddressableStorage/FindMissingBlobs";
pub const execution_execute       = "/build.bazel.remote.execution.v2.Execution/Execute";
pub const bytestream_read         = "/google.bytestream.ByteStream/Read";
pub const bytestream_write        = "/google.bytestream.ByteStream/Write";
// ... etc.
pub const internal_cas_delete_blobs = "/actiond.internal.v1.CAS/DeleteBlobs";
```

The `internal_cas_delete_blobs` method is an actiond-specific extension not present in the upstream REAPI specification. It is used by the cleanup subsystem to evict staged blobs.

Sources: [src/reapi_dispatch.zig:24-42]()

---

## Content-Addressable Store (CAS)

### Digest

`cas.Digest` is a pair of a 32-byte SHA-256 hash and a `u64` byte count. It is the canonical identity for every blob. The empty digest (`Digest.fromBytes("")`) is treated as always present.

```zig
pub const Digest = struct {
    hash: [32]u8,
    size_bytes: u64,
    ...
    pub fn fromBytes(bytes: []const u8) Digest { /* SHA-256 + len */ }
    pub fn toReapi(self: Digest, hash_out: *[64]u8) reapi.Digest { ... }
    pub fn fromReapi(value: reapi.Digest) !Digest { ... }
};
```

Sources: [src/cas.zig:55-100]()

### On-disk layout

Blobs are stored under a sharded two-level directory tree to keep individual directories manageable:

```
blobs/sha256/
  2c/
    2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
  ...
trees/sha256/
  2c/
    2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824/
      (materialized directory tree)
```

The first two hex characters of the hash form the shard directory. Blob files are written with mode `0o444` (read-only to prevent accidental modification). Tree entries are directories containing the fully materialized file hierarchy.

Sources: [src/cas.zig:13-26](), [src/cas.zig:test "CAS subpaths use two-character digest prefix sharding"]()

### BlobWriter and atomic publish

`BlobWriter` is the write path for all incoming blobs:

1. On Linux, attempts to open an anonymous temporary file with `O_TMPFILE` inside `blobs/sha256/`. On other platforms or on failure, falls back to a named temp file (`blobs/sha256/.tmp-<id>`).
2. Data is streamed in via `writeAll`, updating a SHA-256 hasher and byte counter incrementally.
3. `finish` computes the final digest, validates it against an optional expected value, creates the shard directory if needed, and atomically publishes the file:
   - **Linux anonymous path**: `linkat(AT_EMPTY_PATH, ...)` (or via `/proc/self/fd/...` fallback).
   - **Named temp**: `renamePreserve` (a cross-device-safe rename that preserves the source on `PathAlreadyExists`).

If the target path already exists the write is silently dropped—deduplication is inherent.

Sources: [src/cas.zig:334-430]()

### putFile and putFilePromote

`putFile` copies an external file into the CAS by opening it and feeding bytes through a `BlobWriter`. `putFilePromote` (Linux only) attempts a faster path: hash the file in place, `fchmod` it to `0o444`, then `renamePreserve` it into the CAS. If the rename crosses a filesystem boundary (`CrossDevice`) or a permission error occurs it falls back to copy. Detailed counters are exposed via `PutFileStats` for observability.

Sources: [src/cas.zig:222-295]()

### materializeTree

`materializeTree` reconstructs a full directory hierarchy from a root `reapi.Directory` digest:

1. Reads the root `Directory` protobuf from the CAS.
2. For each `FileNode`: attempts `hardLink` from the blob path; copies if hard-linking fails.
3. For each `DirectoryNode`: recurses.
4. On Linux the entire tree is built in a temporary directory first, then `rename`d into place atomically. On cross-device mounts it builds directly at the final path.

Sources: [src/cas.zig:296-332]()

---

## Service Implementations

### Capabilities (`capabilities_service.zig`)

Returns a static `ServerCapabilities` message. No state is consulted:

| Field | Value |
|---|---|
| Digest functions | SHA-256 |
| Action cache update | enabled |
| Max batch size | 4 MiB |
| Execution enabled | true |
| API versions | 2.0 – 2.3 |

Sources: [src/capabilities_service.zig:3-16]()

### CAS Batch Operations (`cache_service.zig`)

- **`FindMissingBlobs`**: iterates the request digest list, calls `store.has`, returns digests for which the answer is false.
- **`BatchUpdateBlobs`**: for each item, validates the data against the declared digest via `cas.Digest.fromBytes`, then calls `store.putKnownBytes`.
- **`BatchReadBlobs`**: reads each blob with `store.readAlloc`; per-item status codes are returned in the response.
- **`deleteBlobs` / `deleteBlobsWhenVisible`**: internal cleanup helpers that call `store.deleteBlob` conditionally.

Sources: [src/cache_service.zig:1-80]()

### ByteStream Read (`bytestream_service.zig`)

`writeReadGrpcRecords` is the preferred server-streaming path. It opens the blob file with `store.openBlob`, optionally seeks to `read_offset`, and reads in 1 MiB chunks (`max_read_response_data_bytes`). Each chunk is wrapped in a `grpc_record` (5-byte length prefix + protobuf `ReadResponse`) and written immediately to the `body_sink.Writer`—which on the server becomes the live HTTP/2 DATA writer. This avoids loading large blobs entirely into memory.

Sources: [src/bytestream_service.zig:68-120]()

### ByteStream Write (`bytestream_service.zig`)

`WriteGrpcStream` implements the streaming upload path:

1. Incoming bytes (potentially split mid-record) are buffered in `pending`.
2. Complete gRPC records are parsed; the first record's `resource_name` is used to extract and validate the target digest via `bytestream.parseBlobResource`.
3. A `cas.BlobWriter` is opened lazily on the first record.
4. Each `WriteRequest.data` chunk is passed to `BlobWriter.writeAll` (hashing and writing simultaneously).
5. When `finish_write = true` is seen `WriteGrpcStream.finish` calls `BlobWriter.finish`, which validates the final hash and atomically publishes the file.

Timing is logged for large uploads (≥128 records, ≥1 MiB, or ≥64 KiB + slow).

Sources: [src/bytestream_service.zig:178-280]()

### ActionCache (`action_cache_service.zig`, `action_cache.zig`)

`action_cache.Store` maps action digest → serialized `reapi.ActionResult`. Keys are stored as files at `ac/sha256/XX/FULL_HASH` (same sharding scheme as the CAS). `updateActionResult` serializes the result via `reapi.encodeAlloc` and writes it atomically. `getActionResult` reads and deserializes it.

Sources: [src/action_cache.zig:7-60](), [src/action_cache_service.zig:12-35]()

### Execution (`execution_service.zig`)

The `execute` function orchestrates the full remote execution flow:

1. **Cache lookup**: if `!skip_cache_lookup` and an `action_cache_store` is configured, attempt `store.get` with the action digest. On a hit, return a `CompletedOperation` with `cached_result: true` immediately—no execution.
2. **do_not_cache check**: reads the `Action` proto from the CAS to check the `do_not_cache` flag before running.
3. **Work directory**: creates a unique subdirectory under `work_root` using `exec/<hash>-<size>-<monotonic_id>`.
4. **Execute**: calls `action_executor.executeActionWithOptions`, which materializes inputs, runs the command, and collects outputs.
5. **Store result**: if `!do_not_cache`, the `ActionResult` is persisted to the `action_cache_store`.
6. **Return**: always returns a completed `Operation` (the server never long-polls; `done = true` immediately). The `ExecuteResponse` is packed into a `google.protobuf.Any` inside the `Operation.response` field.

The work directory is deleted after execution regardless of outcome.

Sources: [src/execution_service.zig:30-80]()

---

## Protobuf Types (`reapi.zig`)

All REAPI protobuf messages are hand-coded Zig structs with `encode`/`decode` methods that call into `protobuf_wire.zig` directly. There is no generated code. Key types include:

| Type | Purpose |
|---|---|
| `Digest` | `{hash: []const u8, size_bytes: i64}` — wire representation |
| `Action` | `command_digest`, `input_root_digest`, `do_not_cache`, `platform` |
| `Command` | `arguments`, `environment_variables`, `output_paths`, `platform` |
| `Directory` | `files: []FileNode`, `directories: []DirectoryNode` (must be canonically sorted) |
| `ActionResult` | `output_files`, `output_directories`, `exit_code`, `stdout_digest`, `stderr_digest`, `execution_metadata` |
| `ExecuteResponse` | `result`, `cached_result`, `status` |
| `Operation` | `name`, `done`, `response: ?Any` |

`Directory.validateCanonical` is called on both encode and decode to enforce the REAPI requirement that entries are lexicographically sorted with no duplicates.

Sources: [src/reapi.zig:1-15](), [src/reapi.zig:398-440]()

---

## End-to-End Data Flow

The sequence below shows the full path for a Bazel `Execute` RPC when the action is not cached.

```mermaid
sequenceDiagram
    participant Bazel as Bazel Client
    participant H2 as grpc_http2_server
    participant Dispatch as reapi_dispatch.Server
    participant Exec as execution_service
    participant AC as action_cache.Store
    participant CAS as cas.Store

    Bazel->>H2: HTTP/2 HEADERS (:path=/Execution/Execute)
    Bazel->>H2: HTTP/2 DATA (gRPC ExecuteRequest) + END_STREAM
    H2->>H2: Spawn ResponseTask thread
    H2->>Dispatch: handleServerStreamingResponse(method, body, Http2BodyWriter)
    Dispatch->>Exec: execute(request, blob_store, ac_store, work_root)
    Exec->>AC: get(action_digest)
    AC-->>Exec: FileNotFound (cache miss)
    Exec->>CAS: readAlloc(action_digest) → Action proto
    Exec->>CAS: materializeTree(input_root_digest) → work_dir/
    Exec->>Exec: action_executor.executeActionWithOptions()
    Exec->>CAS: putFilePromote(output file) → Digest
    Exec->>AC: put(action_digest, ActionResult)
    Exec-->>Dispatch: CompletedOperation{done:true}
    Dispatch->>H2: encode Operation as gRPC record
    H2->>Bazel: HTTP/2 DATA (gRPC Operation)
    H2->>Bazel: HTTP/2 HEADERS (grpc-status: 0) + END_STREAM
```

For a `ByteStream/Write` upload the flow is:

```mermaid
sequenceDiagram
    participant Bazel as Bazel Client
    participant H2 as grpc_http2_server
    participant WS as WriteGrpcStream
    participant CAS as cas.Store / BlobWriter

    Bazel->>H2: HTTP/2 HEADERS (:path=/ByteStream/Write)
    loop DATA chunks
        Bazel->>H2: HTTP/2 DATA (gRPC WriteRequest chunk)
        H2->>WS: append(bytes)
        WS->>CAS: BlobWriter.writeAll(data) [hash incrementally]
    end
    Bazel->>H2: HTTP/2 DATA + END_STREAM (finish_write=true)
    H2->>WS: finish()
    WS->>CAS: BlobWriter.finish() → rename/linkat into blobs/sha256/XX/HASH
    WS-->>H2: WriteResponse{committed_size}
    H2->>Bazel: HTTP/2 DATA (WriteResponse) + trailers
```

---

## Body Sink: Streaming Abstraction

`body_sink.Writer` is a trait-object (vtable pointer pair) that decouples service code from the HTTP/2 transport:

```zig
pub const Writer = struct {
    ctx: *anyopaque,
    write_all: *const fn (*anyopaque, std.Io, std.mem.Allocator, []const u8) anyerror!void,
    ...
};
```

- `ArrayListWriter`: accumulates bytes into a `std.ArrayListUnmanaged(u8)` (used in tests and buffered paths).
- `Http2BodyWriter` (defined in `grpc_http2_server.zig`): calls `SharedHttp2Writer.sendData`, which respects HTTP/2 flow control and writes DATA frames directly to the TCP connection.

The `handleServerStreamingResponse` dispatch path in `reapi_dispatch.Server` passes the `Http2BodyWriter` through to services like `bytestream_service.writeReadGrpcRecords` and `tree_service.writeGetTreeGrpcRecords`, enabling incremental streaming without buffering entire responses.

Sources: [src/body_sink.zig:1-40](), [src/grpc_http2_server.zig:541-570]()

---

## Summary

actiond's REAPI server is a self-contained HTTP/2 gRPC implementation in Zig. The server spawns one thread per TCP connection and one thread per in-flight gRPC stream, enabling concurrent handling of up to 128 simultaneous streams per connection. The CAS uses SHA-256 content addressing with a two-character shard prefix layout; blobs are published atomically via rename or `linkat` to provide deduplication and crash safety. Service dispatch is a simple string-comparison switch in `reapi_dispatch.Server`; each service module interacts directly with `cas.Store` or `action_cache.Store` without intermediate abstraction. The `Execute` path is synchronous and always returns a completed `Operation`—cached hits return immediately, live executions block until the action finishes and the result is stored.

Sources: [src/grpc_http2_server.zig:388-420](), [src/reapi_dispatch.zig:45-140](), [src/cas.zig:115-170](), [src/execution_service.zig:30-80]()
