# Real-time document sync

> Concept: Sync-service worker routes, Durable Object sessions, permission tokens, Bebop messages, snapshot lifecycle, and client reconnect behavior.

- Repository: macro-inc/macro
- GitHub: https://github.com/macro-inc/macro
- Human docs: https://grok-wiki.com/public/docs/macro-inc-macro-bb988e1a448e
- Complete Markdown: https://grok-wiki.com/public/docs/macro-inc-macro-bb988e1a448e/llms-full.txt

## Source Files

- `rust/sync-service/src/cf_worker.rs`
- `rust/sync-service/src/durable_object.rs`
- `rust/sync-service/src/websocket.rs`
- `rust/sync-service/src/generated/schema.rs`
- `js/app/packages/service-clients/service-sync/client.ts`
- `js/app/packages/service-clients/service-sync/source.ts`
- `js/app/packages/block-md/definition.ts`

---

---
title: "Real-time document sync"
description: "Concept: Sync-service worker routes, Durable Object sessions, permission tokens, Bebop messages, snapshot lifecycle, and client reconnect behavior."
---

The sync service is a Cloudflare Worker backed by one `DocumentSyncSession` Durable Object per `document_id`; it exposes HTTP document endpoints, upgrades `/document/{document_id}/connect` to a WebSocket session, serializes realtime messages with Bebop, and stores Loro snapshots plus pending operations for reconnect and recovery.

## Runtime shape

```mermaid
flowchart LR
  subgraph Client
    MD["block-md load()"]
    Source["createSyncServiceSource()"]
    Rest["syncServiceClient"]
  end

  subgraph Worker["sync-service Worker"]
    Router["cf_worker::router"]
    Schema["/schema"]
    Copy["/document/{id}/copy"]
  end

  subgraph DO["DocumentSyncSession Durable Object"]
    WS["/connect WebSocket"]
    API["metadata/raw/snapshot/active_peers/initialize"]
    State["DocumentState(LoroDoc)"]
    Awareness["EphemeralStore"]
    Alarm["alarm()"]
  end

  subgraph Storage
    SQLKV["Durable Object SQL snapshot store"]
    KVFallback["SNAPSHOT_STORE_KV fallback"]
    DOKV["Durable Object KV pending ops"]
    D1["USER_PEER_MAPPING D1"]
  end

  MD --> Source
  Source --> WS
  Rest --> Router
  Router --> Schema
  Router --> Copy
  Router --> DO
  WS --> State
  WS --> Awareness
  API --> State
  State --> SQLKV
  SQLKV --> KVFallback
  DOKV --> State
  WS --> D1
  Alarm --> SQLKV
  Alarm --> DOKV
```

## Worker routes

The top-level Worker routes health/schema traffic directly and forwards document traffic to the Durable Object namespace `DOCUMENT_SYNC_SESSION` using `id_from_name(document_id)`.

| Route | Handler | Authentication | Response |
|---|---|---:|---|
| `/` | Worker | None | `Hello Sync Service!` |
| `/health` | Worker | None | `healthy` |
| `/schema` | Worker | None | Raw `bebop/schema.bop` |
| `/document/{document_id}/copy` | Worker orchestration | Header token via forwarded Durable Object calls | Copies a snapshot into another document |
| `/document/{document_id}/{*rest}` | Durable Object pass-through | Depends on Durable Object route | Durable Object response or `408` on RPC timeout |

`pass_to_durable_object` wraps Durable Object fetches with the service default timeout and returns status `408` if the RPC times out.

## Durable Object routes

`DocumentSyncSession` owns the document session state. CORS validation runs before route handling. Allowed origins include local app ports, Macro production/dev/staging origins, Capacitor localhost, Apollo testing, and non-empty `https://{subdomain}.preview.macro.com` preview origins.

| Route | Purpose | Auth requirement |
|---|---|---|
| `/document/{document_id}/connect?token=...` | WebSocket upgrade and initial sync | JWT in query params |
| `/document/{document_id}/exists` | Existence check | None |
| `/document/{document_id}/wakeup` | Warm in-memory state and keep worker alive | None |
| `/document/{document_id}/peer/{peer_id}` | Resolve registered peer to user | None |
| `/document/{document_id}/metadata` | Return `{ id, peers, version_id }` | Bearer JWT or internal admin |
| `/document/{document_id}/raw` | Return Loro deep JSON | Bearer JWT or internal admin |
| `/document/{document_id}/snapshot` | Return binary Loro snapshot | Bearer JWT or internal admin |
| `/document/{document_id}/active_peers` | Return active peer IDs as strings | Bearer JWT or internal admin |
| `/document/{document_id}/initialize` | Store initial binary snapshot | Bearer JWT with at least `edit` |
| `/document/{document_id}/debug_dump_operations` | Return pending operations | Admin |
| `/document/{document_id}/debug_do_kv_get/{key}` | Inspect Durable Object KV key | Admin |
| `/document/{document_id}/debug_do_kv_list/{prefix}` | Inspect Durable Object KV prefix | Admin |

<Note>
Route matching is path-based. The implementation handles CORS `OPTIONS` separately, but most Durable Object handlers do not enforce a specific HTTP method after the path is matched.
</Note>

## Permission tokens

The service accepts two token sources:

| Surface | Token source | Decoder mode |
|---|---|---|
| WebSocket connect | `token` query param | `TokenFrom::QueryParams` |
| HTTP document APIs | `Authorization: Bearer {jwt}` | `TokenFrom::Headers` |
| Internal/admin APIs | `x-internal-auth-key` | `TokenFrom::Headers` admin shortcut |

JWTs are HS256 tokens signed with `DOCUMENT_PERMISSIONS_SECRET` and contain:

```ts
type AuthToken = {
  user_id?: string;
  document_id: string;
  access_level: 'view' | 'comment' | 'edit' | 'owner' | 'admin';
};
```

Document-scoped requests must match `document_id` unless the token is internal admin. `initialize` requires `edit` or stronger. Debug routes require `admin`.

For WebSocket document updates, `PeerUpdate` is ignored when `Wsm::can_edit()` is false. The current runtime write gate treats `comment`, `edit`, `owner`, and `admin` as editable because `AccessLevel::can_edit()` checks for `AccessLevel::Comment` or stronger; `view` users cannot push document updates. Awareness updates are accepted for connected users independently of edit access.

## WebSocket session lifecycle

1. `connect_handler` decodes the query token.
2. The Durable Object stores the `document_id` if not already set.
3. A Cloudflare `WebSocketPair` is created.
4. The server socket is accepted with a generated WebSocket tag.
5. `WebSocketMetadata` is stored in Durable Object storage and memory:
   - `user_id`
   - `access_level`
   - `peer_ids`
6. The current `DocumentState` is loaded or reused.
7. The server sends `RemoteInitialSync` with:
   - a shallow Loro snapshot
   - encoded awareness state
8. The client receives the paired WebSocket response.

String `"ping"` messages receive `"pong"`. Binary messages are decoded as `FromPeer` Bebop messages.

## Bebop message protocol

The schema lives in `rust/sync-service/bebop/schema.bop` and is generated into Rust and TypeScript clients.

### Client to service: `FromPeer`

| Message | Fields | Behavior |
|---|---|---|
| `PeerUpdate` | `update: byte[]` | Applies update to `DocumentState`, records pending op, broadcasts `RemoteUpdate` to other sockets, sends `RemoteUpdateAck` to sender |
| `PeerAwareness` | `awareness: byte[]` | Applies awareness update and broadcasts `RemoteAwareness` to other sockets |
| `PeerRequestSince` | `frontiers: byte[]` | Decodes Loro frontiers and returns `RemoteUpdateSince` |
| `PeerRequestSnapshot` | none | Returns `RemoteSnapshot` with a shallow snapshot |
| `PeerRegisterId` | `peerid: uint64` | Associates a Loro peer ID with the socket metadata and, when `user_id` exists, D1 |

### Service to client: `FromRemote`

| Message | Fields | Emitted when |
|---|---|---|
| `RemoteInitialSync` | `snapshot`, `awareness` | Immediately after WebSocket connect |
| `RemoteUpdate` | `update` | Another peer pushes an accepted update |
| `RemoteAwareness` | `awareness` | Another peer updates or clears awareness |
| `RemoteSnapshot` | `snapshot` | Alarm broadcast or explicit snapshot request |
| `RemoteUpdateAck` | `update` | Sender’s update was accepted and processed |
| `RemoteUpdateSince` | `update`, `frontiers` | Response to `PeerRequestSince` |

Messages larger than 1 MB log a warning before deserialization; they are not rejected solely because of that size check.

## Snapshot and operation lifecycle

`DocumentState` wraps a Loro document. It imports snapshots with the `from_service` tag, imports client updates with the `from_client` tag, and tracks whether a snapshot should be saved by comparing `last_update` and `last_export`.

### Storage layers

| Data | Storage |
|---|---|
| Current snapshot | Default feature: Durable Object SQL via `DurableSQLStorage`, with `SNAPSHOT_STORE_KV` fallback for reads |
| Pending operations | Durable Object KV keys under `o/` |
| All recent operations | Durable Object KV keys under `a/` |
| Last saved version vector | Durable Object KV key `LAST_VERSION_VECTOR` |
| User-peer mappings | D1 binding `USER_PEER_MAPPING`, table `peer_user_map` |

The default Cargo feature set enables `do-sqlite-snapshot-storage`, so snapshots are written to Durable Object SQL. The combined storage backend reads SQL first and falls back to KV when SQL has no snapshot.

### Save loop

After each binary WebSocket message, the Durable Object schedules an alarm roughly 5 seconds in the future if no later alarm is already scheduled.

When the alarm fires:

1. The Durable Object loads `DocumentState`.
2. If `state.should_save()` is true:
   - exports a full Loro snapshot
   - stores the snapshot
   - stores the Loro operation version vector
   - clears applied pending operation keys
   - marks the state exported
3. If sockets are still connected:
   - schedules the next alarm
   - broadcasts `RemoteSnapshot` with a shallow snapshot
4. If no sockets remain, it logs that the Durable Object reached zero connections.

`wakeup` calls warm the session storage and document state when the document exists, then use a JavaScript timeout keepalive with the default 60 second TTL. The TypeScript client’s `safeWakeup` debounces wakeups by 200 ms and suppresses repeat wakeups for 55 seconds per document.

## Initialization and copy

`initialize` accepts a Bebop `InitializeFromSnapshotRequest`:

```ts
type InitializeFromSnapshotRequest = {
  snapshot: Uint8Array;
};
```

It fails if snapshot storage already contains a snapshot for the document. Otherwise it stores the snapshot, records the `DOCUMENT_ID` in Durable Object storage, and prepares `SessionStorage`.

`copy` is handled at the Worker layer because it touches two Durable Object instances:

1. POST `/document/{source_id}/snapshot` with optional `version_id`.
2. Wrap the returned binary snapshot in `InitializeFromSnapshotRequest`.
3. POST `/document/{target_id}/initialize`.

The JSON copy request is:

```json
{
  "target_document_id": "new-document-id",
  "version_id": {
    "peer": "123",
    "counter": 10
  }
}
```

`version_id` is optional. When present, the snapshot handler exports state at the requested Loro frontier.

## TypeScript client behavior

`syncServiceClient` exposes HTTP helpers against `SYNC_SERVICE_HOSTS.worker`:

| Method | Route |
|---|---|
| `wakeup({ documentId })` | `GET /document/{id}/wakeup` |
| `safeWakeup(id)` | Debounced `GET /document/{id}/wakeup` |
| `exists({ documentId })` | `HEAD /document/{id}/exists` |
| `initializeFromSnapshot({ documentId, snapshot })` | `POST /document/{id}/initialize` |
| `getDocumentMetadata({ documentId })` | `GET /document/{id}/metadata` |
| `getSnapshot({ documentId })` | `GET /document/{id}/snapshot` |
| `getRaw({ documentId })` | `GET /document/{id}/raw` |

Tauri requests add an explicit `Origin` header of `https://dev.macro.com` in development and `https://macro.com` otherwise.

`createSyncServiceSource(documentId, token)` builds the WebSocket URL:

```text
{SYNC_SERVICE_HOSTS.ws}/document/{documentId}/connect?token={token}
```

Connection settings:

| Setting | Value |
|---|---:|
| Reconnect backoff | Constant 500 ms |
| Max retries | 20 |
| Initial sync timeout | 10 seconds |
| Update ACK timeout | 3 seconds |
| Snapshot request timeout | 10 seconds |
| Updates-since timeout | 10 seconds |
| Heartbeat interval | 10 seconds |
| Heartbeat timeout | 5 seconds |
| Max missed heartbeats | 2 |

The first connection uses the provided token. Reconnects request a fresh permission token from `storageServiceClient.permissionsTokens.createPermissionToken({ document_id })`; if token refresh fails, the client falls back to the last known WebSocket URL.

Heartbeat is intentionally started only after the first `RemoteInitialSync` arrives. On reconnect, the client starts heartbeat, waits for another `RemoteInitialSync`, and emits a `reconnect` sync event containing the new snapshot and awareness. If reconnect sync times out, it logs the failure and lets heartbeat monitoring close/retry the socket.

## Markdown block integration

The Markdown block only loads realtime collaboration when the source is `sync-service`.

Load flow:

1. Fetch document metadata, document location, and permission token in parallel.
2. If the location is a pending presigned URL, wait for `syncServiceContent` readiness.
3. Reject the load when the final location is not `syncServiceContent`.
4. Create the sync-service source with the permission token.
5. Create a Loro manager using `MARKDOWN_LORO_SCHEMA`.
6. Initialize local Loro state from `initialSync.snapshot`.
7. Return the block data with `syncSource`, `loroManager`, metadata, and user access level.

Markdown initialization and lifecycle persistence are backend-owned; the frontend does not repair an object-storage-backed Markdown document into sync-service content during block load.

## Operational configuration

Cloudflare bindings in `wrangler.toml`:

| Binding | Purpose |
|---|---|
| `DOCUMENT_SYNC_SESSION` | Durable Object namespace |
| `USER_PEER_MAPPING` | D1 database for peer/user mappings |
| `DOCUMENT_SNAPSHOT_BUCKET` | R2 bucket when the R2 snapshot feature is enabled |
| `SNAPSHOT_STORE_KV` | KV snapshot fallback |
| `DOCUMENT_VERSIONING_KV` | Configured KV namespace |
| `INTERNAL_API_SECRET_KEY` | Variable naming the secret binding for internal API auth |
| `DOCUMENT_PERMISSIONS_SECRET` | JWT signing secret expected at runtime |
| `SPS_URL` | Search processing service URL when `search-service` is enabled |

Useful local commands:

```bash
cd rust/sync-service

# Build the Worker used by Miniflare tests
just worker-build

# Run e2e tests and Rust unit tests
just test

# Apply local D1 migrations and start Wrangler
just dev

# Regenerate TypeScript Bebop bindings used by tests
cd bebop && npx bebopc build
```

## Error and verification signals

| Symptom | Likely cause |
|---|---|
| `401` | Missing/malformed Bearer token, invalid JWT, wrong document token, or missing internal key |
| `403` | Request `Origin` is not allowed |
| `404` | Snapshot/document does not exist for routes that check existence |
| `408` | Worker-to-Durable-Object RPC timeout |
| Missing update ACK | Client did not receive `RemoteUpdateAck` within 3 seconds |
| View-only edits do not propagate | `PeerUpdate` was ignored by the WebSocket write gate |
| Initial sync timeout | Client did not receive `RemoteInitialSync` within 10 seconds |
| Snapshot already exists during initialize | Target document already has stored snapshot state |

## Related pages

<CardGroup>
  <Card title="Sync service client" href="/service-clients/service-sync">
    TypeScript source wrapper, reconnect behavior, and SyncSource integration.
  </Card>
  <Card title="Markdown block loading" href="/blocks/markdown">
    Markdown-specific sync-service source loading and Loro initialization.
  </Card>
</CardGroup>
