# Durable Workflow Engine: Checkpoint/Replay

> The checkpoint/replay model inspired by Cloudflare Workflows: the handler function IS the workflow, ctx.step() calls are discovered at runtime, and Postgres checkpoints each step result. On resume after a crash the handler re-executes top-to-bottom but skips already-checkpointed steps instantly. How this enables sleep, suspension, child agents, and idempotent re-runs.

- Repository: paradigmxyz/centaur
- GitHub: https://github.com/paradigmxyz/centaur
- Human wiki: https://grok-wiki.com/public/wiki/paradigmxyz-centaur-57fc6b2755e2
- Complete Markdown: https://grok-wiki.com/public/wiki/paradigmxyz-centaur-57fc6b2755e2/llms-full.txt

## Source Files

- `services/api/api/workflow_engine.py`
- `workflows/paradigm_pulse_daily.py`
- `workflows/slack_sync.py`
- `services/api/tests/test_workflow_engine_title.py`
- `services/api/tests/test_workflow_idempotency_unit.py`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [services/api/api/workflow_engine.py](services/api/api/workflow_engine.py)
- [workflows/paradigm_pulse_daily.py](workflows/paradigm_pulse_daily.py)
- [workflows/slack_sync.py](workflows/slack_sync.py)
- [services/api/tests/test_workflow_engine_title.py](services/api/tests/test_workflow_engine_title.py)
- [services/api/tests/test_workflow_idempotency_unit.py](services/api/tests/test_workflow_idempotency_unit.py)
</details>

# Durable Workflow Engine: Checkpoint/Replay

Centaur's workflow engine implements a **checkpoint/replay** execution model inspired by Cloudflare Workflows. The central idea is that a plain Python `async` function *is* the workflow: there is no separate DAG definition, no step registry, and no declarative schema. Steps are discovered at runtime when the handler calls `ctx.step()`. After each step completes, the engine persists its result to Postgres. If the process crashes or suspends, the handler is simply called again from the top — but every step that already has a Postgres checkpoint is returned instantly from the cache, making the re-execution functionally transparent.

This design means that dynamic branches, loops, and conditionals work naturally because they are just Python control flow. Sleeping for twelve hours, waiting for an external event, spawning a child agent, and retrying a flaky API call are all expressed as ordinary awaits inside the same handler function.

---

## The Handler-as-Workflow Contract

Handler discovery follows the same convention as Centaur tools. Python files placed in the `api/workflows/` built-in package or any directory listed in the `WORKFLOW_DIRS` environment variable are scanned at startup. A valid handler module must export:

- `WORKFLOW_NAME: str` — the canonical workflow name used as a lookup key.
- `async def handler(params, ctx: WorkflowContext)` — the entry point.

Optionally, a module may export:

- `Input` — a `@dataclass` that the raw `input_json` dict is auto-coerced into before the handler is called.
- `SCHEDULE` / `CRON` / `INTERVAL` — scheduling metadata picked up by the schedule engine.

A shorthand auto-handler is also synthesized when a module exports `PROMPT` and `SLACK_CHANNEL` but no explicit `handler`: the engine wraps `ctx.agent_turn(PROMPT)` followed by `ctx.post_to_slack(channel, result_text)` automatically.

Sources: [services/api/api/workflow_engine.py:1429-1486]()

The `paradigm_pulse_daily` workflow illustrates the full-handler pattern:

```python
# workflows/paradigm_pulse_daily.py
WORKFLOW_NAME = "paradigm_pulse_daily"
CRON = "45 7 * * *"
SLACK_CHANNEL = "paradigm-pulse"

async def handler(inp: dict[str, Any], ctx: WorkflowContext) -> dict[str, Any]:
    result = await ctx.agent_turn(PROMPT)
    text = str(result.get("result_text") or "").strip()
    ...
    await ctx.call_tool("slack", "send_message", args)
    return result
```

Sources: [workflows/paradigm_pulse_daily.py:15-228]()

---

## WorkflowContext: The Checkpoint Primitives

Every handler receives a `WorkflowContext` (`ctx`) that exposes three core primitives and several higher-level helpers. All of them share the same underlying invariant: *execute at most once, return the cached result on replay*.

### `ctx.step(name, fn)`

The fundamental primitive. On the first execution it calls `fn()`, awaits the result, writes it to `workflow_checkpoints`, and returns the value. On any subsequent execution (replay after crash, retry, or resume after suspension), the step name is looked up in the pre-loaded checkpoint dict and the cached value is returned instantly — `fn` is never called again.

```python
# services/api/api/workflow_engine.py (simplified)
async def step(self, name, fn, *, retry=None, timeout=None, ...):
    checkpoint_name = self._resolve_name(name)
    if checkpoint_name in self._checkpoints:
        return self._checkpoints[checkpoint_name]   # instant cache hit
    self._in_replay = False
    result = await self._call_step_fn(fn)           # only runs once
    await self._persist_checkpoint(checkpoint_name, result, ...)
    return result
```

Sources: [services/api/api/workflow_engine.py:330-398]()

Step names inside loops are automatically deduplicated using a counter: `name`, `name#2`, `name#3`, and so on, so a `for channel in channels` loop produces stable, ordered checkpoints without any manual naming.

Sources: [services/api/api/workflow_engine.py:251-262]()

### `ctx.sleep(name, duration)` and `ctx.sleep_until(name, when)`

Sleep writes a wake-time ISO timestamp as the checkpoint state, then raises `SuspendWorkflow` with `available_at` set to the future wake time. The worker sets the run's status to `sleeping` and stops. When the schedule reconciler notices `available_at <= now`, it re-queues the run. The handler re-executes top-to-bottom: when it reaches the sleep call again, the checkpoint already exists and the engine checks whether the wake time has passed. If it has, the call returns normally (no-op); if not, it re-raises `SuspendWorkflow`.

```python
async def sleep(self, name, duration):
    checkpoint_name = self._resolve_name(name)
    if checkpoint_name in self._checkpoints:
        wake_at = dt.datetime.fromisoformat(self._checkpoints[checkpoint_name])
        if dt.datetime.now(dt.timezone.utc) < wake_at:
            raise SuspendWorkflow(available_at=wake_at)
        return  # wake time passed — fall through
    wake_at = dt.datetime.now(dt.timezone.utc) + duration
    await self._persist_checkpoint(checkpoint_name, wake_at.isoformat(), step_kind="sleep")
    raise SuspendWorkflow(available_at=wake_at)
```

Sources: [services/api/api/workflow_engine.py:400-416]()

### `ctx.wait_for_event(name, event_type, correlation_id)`

When the event has not arrived yet, the engine writes a `_waiting` marker dict as the checkpoint (step_kind `event_wait`) and raises `SuspendWorkflow` with `status="waiting"`. On resume, the engine queries `workflow_events` for a matching `(event_type, correlation_id)` row. If found, the real payload replaces the wait-marker checkpoint and the handler continues. If not found, `SuspendWorkflow` is raised again. An optional `timeout` stores a deadline ISO string in the marker; when the deadline is exceeded, `TimeoutError` is raised.

Sources: [services/api/api/workflow_engine.py:452-543]()

---

## Checkpoint Persistence and Lease Fencing

`_persist_checkpoint` writes atomically inside a Postgres transaction that also extends the worker's lease. Before the write, it acquires a `FOR UPDATE` lock on the `workflow_runs` row and verifies that:

1. The run still exists.
2. The run is not `cancelled`.
3. The `worker_id` still matches the current worker.

If any check fails, `CancelledWorkflow` is raised, which causes the worker to abort cleanly without corrupting state. This fencing prevents two workers from racing on the same run after a lease expiry.

```python
# services/api/api/workflow_engine.py
async def _persist_checkpoint(self, checkpoint_name, value, ...):
    async with self._pool.acquire() as conn:
        async with conn.transaction():
            row = await conn.fetchrow(
                "SELECT status, worker_id FROM workflow_runs "
                "WHERE run_id = $1 FOR UPDATE", self.run_id,
            )
            if not row or row["status"] == "cancelled" or row["worker_id"] != self._worker_id:
                raise CancelledWorkflow()
            await conn.execute(
                "INSERT INTO workflow_checkpoints ... ON CONFLICT ... DO UPDATE SET state = EXCLUDED.state",
                ...
            )
            await conn.execute(
                "UPDATE workflow_runs SET worker_lease_expires_at = NOW() + ...", ...
            )
    self._checkpoints[checkpoint_name] = value
```

Sources: [services/api/api/workflow_engine.py:264-318]()

The in-memory `self._checkpoints` dict is updated after each successful write so that subsequent `step` calls during the same execution do not hit the database unnecessarily.

---

## Suspension and Resume Lifecycle

```text
┌─────────────────────────────────────────────────────────────┐
│  workflow_runs.status transitions                           │
│                                                             │
│  queued ──► running ──► completed                          │
│                │                                            │
│                ├──► sleeping  (ctx.sleep / ctx.sleep_until) │
│                │       │                                    │
│                │       └── available_at <= now ──► queued   │
│                │                                            │
│                ├──► waiting   (ctx.wait_for_event /         │
│                │               ctx.wait_for_workflow)       │
│                │       │                                    │
│                │       └── event/child arrives ──► queued   │
│                │                                            │
│                └──► failed_permanent / cancelled            │
└─────────────────────────────────────────────────────────────┘
```

The engine's reconcile loop (`_tick_once`) has two jobs:

1. **Re-queue expired leases**: any run with `status = 'running'` and an expired `worker_lease_expires_at` is reset to `queued`, making it available for another worker to claim.
2. **Wake sleeping/waiting runs**: runs whose `available_at <= NOW()` are promoted back to `queued`.

Expired-lease re-queueing ensures crash recovery: if a worker dies mid-step, the lease expires after `WORKFLOW_WORKER_LEASE_S` seconds (default 30s) and the run is automatically retried from its last checkpoint.

Sources: [services/api/api/workflow_engine.py:1068-1082](), [services/api/api/workflow_engine.py:153-184]()

---

## Idempotency on Run Creation

When `create_workflow_run` is called with a `trigger_key`, the engine takes a `pg_advisory_xact_lock` on `(workflow_name, trigger_key)` and checks for an existing row. If the row exists with a matching `request_hash`, the existing `run_id` is returned without creating a duplicate — the call is fully idempotent. If the row exists but the hash differs (different payload for the same key), a `ControlPlaneError(IDEMPOTENCY_PAYLOAD_MISMATCH)` is raised so callers know they have a logic error rather than silently ignoring the conflict.

Sources: [services/api/api/workflow_engine.py:1762-1841]()

The `test_workflow_idempotency_unit.py` suite explicitly verifies that `_insert_workflow_run` raises `IDEMPOTENCY_PAYLOAD_MISMATCH` when the `request_hash` of the new input does not match the stored hash, and that the warning log includes safe metadata (hash prefix, key list, run_id) without leaking the full payload.

Sources: [services/api/tests/test_workflow_idempotency_unit.py:24-69]()

---

## Child Workflows and Agent Turns

`ctx.start_workflow` / `ctx.wait_for_workflow` / `ctx.run_workflow` are the primitives for spawning sub-workflows. `start_workflow` wraps `create_workflow_run` inside a `ctx.step` (step_kind `child_workflow_start`) so the child `run_id` is checkpointed. `wait_for_workflow` stores a `_waiting` marker with the child `run_id` and suspends. When the child reaches a terminal state, the parent is re-queued, re-executes top-to-bottom, and at the `wait_for_workflow` call finds the checkpoint still in `_waiting` state but the child now terminal — it replaces the marker with the child's result and continues.

`ctx.run_agent` is a typed shorthand for `run_workflow("agent_turn", ...)`. It creates a child `agent_turn` workflow run, suspends until it completes, and returns `result_text`.

Sources: [services/api/api/workflow_engine.py:545-695](), [services/api/api/workflow_engine.py:738-779]()

---

## Per-Step Retry and Timeout

`ctx.step` accepts an optional `RetryPolicy` that controls how many times `fn` is re-attempted on failure within the same execution, along with fixed or exponential backoff. Raising `NonRetryableError` inside `fn` bypasses the retry loop and propagates immediately. An optional `timeout: dt.timedelta` wraps `fn` in `asyncio.wait_for`. A successful result — even on a later attempt — is checkpointed exactly once, making the step idempotent across external calls.

```python
@dataclass
class RetryPolicy:
    limit: int = 0
    delay: dt.timedelta = field(default_factory=dt.timedelta)
    backoff: str = "fixed"   # "fixed" | "exponential"
    max_delay: dt.timedelta | None = None
```

Sources: [services/api/api/workflow_engine.py:131-148](), [services/api/api/workflow_engine.py:358-398]()

---

## Replay-Safe Logging

`ctx.log(msg, **kwargs)` suppresses structured log lines during replay by checking the `_in_replay` flag. The flag is `True` when the context is first constructed with a non-empty checkpoints dict and is cleared to `False` the moment the first un-checkpointed step executes. This prevents duplicate log lines — an important ergonomic property since the handler may replay dozens of already-completed steps before reaching live work.

Sources: [services/api/api/workflow_engine.py:437-450]()

---

## Workflow Discovery and the External WORKFLOW_DIRS Mount

Built-in workflows live under `api/workflows/`. The `WORKFLOW_DIRS` env var accepts colon-separated filesystem paths; Python files found there are loaded into the `centaur.workflows.*` namespace. This lets operator-supplied workflows be bind-mounted into the container and hot-reloaded without rebuilding the image — the same pattern used for external tool plugins. Each handler is versioned by the SHA-256 of its source file, stored in `workflow_runs.workflow_version` for reproducibility.

Sources: [services/api/api/workflow_engine.py:1356-1532]()

---

## End-to-End Example: `paradigm_pulse_daily`

The daily digest workflow is the clearest real-world example. Its handler takes two steps:

1. `ctx.agent_turn(PROMPT)` — runs a child `agent_turn` workflow, suspending the parent until the AI completes.
2. `ctx.call_tool("slack", "send_message", args)` — posts the result to `#paradigm-pulse`, checkpointed as step_kind `tool_call` so the message is sent exactly once even if the workflow replays.

Because both operations are checkpointed, a crash between step 1 and step 2 causes the handler to replay, find the agent result in the checkpoint cache, and proceed directly to posting — without re-running the expensive AI turn.

Sources: [workflows/paradigm_pulse_daily.py:209-228]()

The `slack_sync` workflow demonstrates a different pattern: it iterates over Slack channels in a plain `for` loop, calling direct Postgres helpers rather than `ctx.step`, because its per-channel work is idempotent upserts that do not need step-level checkpointing. It uses `ctx.log` for structured observability that is suppressed on replay.

Sources: [workflows/slack_sync.py:324-470]()

---

## Summary

The checkpoint/replay model makes Centaur workflows crash-safe, resumable, and idempotent by construction. The handler function is ordinary Python; the engine transparently intercepts `ctx.step` calls to cache results in Postgres. Sleep, event suspension, and child-workflow waiting all use the same mechanism: write a marker checkpoint, raise `SuspendWorkflow`, and let the reconciler wake the run when the condition clears. Because each re-execution fast-forwards through all previously checkpointed steps before reaching the live frontier, complex multi-hour workflows can be interrupted at any point and resumed correctly without custom recovery logic in the handler.

Sources: [services/api/api/workflow_engine.py:1-14]()
