# The Kitchen: Runner & State Store

> How the engine executes stages in parallel threads, writes per-sample JSON atomically so a Ctrl-C mid-run loses nothing, and resumes exactly where it stopped on the next run. Covers StateStore, SampleState, and the atomic-replace trick for Windows Defender.

- Repository: 416rehman/DeepZero
- GitHub: https://github.com/416rehman/DeepZero
- Human wiki: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324
- Complete Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/llms-full.txt

## Source Files

- `src/deepzero/engine/runner.py`
- `src/deepzero/engine/state.py`
- `src/deepzero/engine/context.py`
- `src/deepzero/engine/ui.py`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [src/deepzero/engine/runner.py](src/deepzero/engine/runner.py)
- [src/deepzero/engine/state.py](src/deepzero/engine/state.py)
- [src/deepzero/engine/context.py](src/deepzero/engine/context.py)
- [src/deepzero/engine/ui.py](src/deepzero/engine/ui.py)
- [src/deepzero/engine/types.py](src/deepzero/engine/types.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
</details>

# The Kitchen: Runner & State Store

This page covers the execution heart of DeepZero: how `PipelineRunner` shepherds samples through stages using parallel threads, how `StateStore` persists the result of every sample after each stage in a crash-safe way, and how the whole system can be interrupted mid-flight and resume exactly where it left off on the next run.

Think of `PipelineRunner` as a kitchen brigade: one person (ingest) gathers all the ingredients first, then a line of stations (stages) process each item — several in parallel — while a meticulous ledger (the state store) records what happened to every item after each station. If the kitchen loses power, the ledger lets you skip everything that was already done when the power comes back.

---

## Core Responsibilities at a Glance

| Component | File | Role |
|---|---|---|
| `PipelineRunner` | `runner.py` | Orchestrates stages; manages threads, signals, retries |
| `StateStore` | `state.py` | Atomic file I/O; persists run and sample state |
| `SampleState` | `state.py` | Per-sample ledger of stage outcomes |
| `RunState` | `state.py` | Top-level run metadata (status, stats, stage list) |
| `PipelineDashboard` | `ui.py` | Live Rich terminal dashboard during execution |
| `generate_context` | `context.py` | Regenerates human-readable `context.md` after each stage |

---

## The Lifecycle of a Run

```text
PipelineRunner.run()
  │
  ├─ install SIGINT handler
  ├─ mark RunState → RUNNING, save to disk
  │
  ├─ _resume_or_ingest()
  │     ├─ [existing samples on disk?] → load them, skip ingest  ← resume path
  │     └─ [no samples] → run IngestProcessor, create SampleState per sample
  │
  └─ breadth-first stage loop
        for each (StageSpec, Processor):
          ├─ _run_map()      (one sample per thread)
          ├─ _run_batch()    (all samples handed to processor at once)
          └─ _run_reduce()   (processor selects which sample IDs survive)
          │
          ├─ sync barrier: save_manifest(), generate_context_files()
          └─ save_run() with per-stage stats
```

Sources: [src/deepzero/engine/runner.py:102-265]()

---

## Ingest vs. Resume

When `PipelineRunner._resume_or_ingest()` is called, the very first thing it does is check whether sample directories already exist on disk:

```python
# src/deepzero/engine/runner.py:277-283
existing_states = self.state_store.list_samples()
if existing_states:
    log.info("resume: found %d existing samples, skipping ingest", len(existing_states))
    sample_states = {s.sample_id: s for s in existing_states}
    ...
    return sample_states
```

If `samples/` already has subdirectories with `state.json` files, the expensive ingest step is skipped entirely. The run picks up the same sample objects and passes them into the stage loop. Each stage then checks `is_stage_done()` to skip samples it already processed (see [Skipping Already-Done Work](#skipping-already-done-work) below).

On a fresh run, the ingest processor discovers all samples, a `SampleState` is created per sample, and each state is immediately written to disk before any pipeline stage begins.

Sources: [src/deepzero/engine/runner.py:267-369](), [src/deepzero/engine/state.py:231-240]()

---

## Parallel Execution: Three Stage Flavors

The runner supports three processor shapes, selected at runtime by `isinstance` checks:

### Map (`_run_map`)

The most common shape. Each sample is processed independently. When `spec.parallel > 1`, a `ThreadPoolExecutor` is created:

```python
# src/deepzero/engine/runner.py:451-496
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_map = {
        executor.submit(self._process_one_map, s, spec, processor): s for s in pending
    }
    for future in concurrent.futures.as_completed(future_map):
        if self._shutdown_event.is_set():
            executor.shutdown(wait=False, cancel_futures=True)
            break
        ...
```

When `spec.parallel <= 1` (serial mode), samples are processed in a simple loop. In both cases, results accumulate in a `dirty` list and are flushed to disk in batches of 50:

```python
# src/deepzero/engine/runner.py:444-448
if len(dirty) >= 50:
    for s in dirty:
        self.state_store.save_sample(s)
    dirty.clear()
```

This batching reduces I/O pressure while bounding the maximum data loss on an abrupt crash to 50 samples.

### Batch (`_run_batch`)

A `BulkMapProcessor` receives the entire list of pending entries in a single `process()` call. Results are mapped back by index position. Each sample state is saved immediately after its result is mapped. Sources: [src/deepzero/engine/runner.py:672-746]()

### Reduce (`_run_reduce`)

A `ReduceProcessor` returns a set of sample IDs to keep. Any sample not in that set is marked `FILTERED`. This is the only stage where one sample's fate can depend on others. Sources: [src/deepzero/engine/runner.py:635-668]()

---

## Skipping Already-Done Work

At the top of both `_run_map` and `_run_batch`, the runner filters out samples that are already done for the current stage:

```python
# src/deepzero/engine/runner.py:409-410
pending = [s for s in active if not s.is_stage_done(spec.name)]
```

`is_stage_done` reads `SampleState.history` and returns `True` if the stage's `StageStatus` is `COMPLETED`, `FILTERED`, or `FAILED`:

```python
# src/deepzero/engine/state.py:119-127
def is_stage_done(self, stage_name: str) -> bool:
    s = self.history.get(stage_name)
    if s is None:
        return False
    return s.status in (
        StageStatus.COMPLETED,
        StageStatus.FILTERED,
        StageStatus.FAILED,
    )
```

Because history is persisted to disk after each sample, a process that was interrupted mid-stage will, on the next run, find those completed samples in `history` and skip them. Only the samples that were truly incomplete (or never started) are re-run. Sources: [src/deepzero/engine/state.py:119-127](), [src/deepzero/engine/runner.py:409-414]()

---

## The Sync Barrier

After every stage finishes, the runner performs a mandatory sync before proceeding to the next stage:

```python
# src/deepzero/engine/runner.py:247-248
self.state_store.save_manifest(list(sample_states.values()))
self._generate_context_files(sample_states)
```

`save_manifest` writes a summary JSON (`run_manifest.json`) containing per-sample verdict, current stage, and sha256. The context-file generation regenerates a `context.md` per sample in parallel (up to `cpu_count * 4` threads). These two steps together form the sync barrier — a checkpoint that makes the on-disk state consistent before the next stage reads from it.

Sources: [src/deepzero/engine/runner.py:244-251](), [src/deepzero/engine/state.py:244-269]()

---

## SampleState: The Per-Sample Ledger

`SampleState` is a dataclass with a `history` dict that maps stage names to `StageOutput` records:

```python
# src/deepzero/engine/state.py:57-67
@dataclass
class SampleState:
    sample_id: str
    sha256: str = ""
    source_path: str = ""
    filename: str = ""
    verdict: SampleStatus = SampleStatus.PENDING
    current_stage: str = ""
    history: dict[str, StageOutput] = field(default_factory=dict)
    error: str | None = None
```

Each `StageOutput` carries status, verdict, timestamps, artifacts, freeform `data`, and an optional error string. Stage outputs are never merged across stages — the data dict is fully namespaced per stage:

```python
# src/deepzero/engine/state.py:52
# namespaced processor output - never merged across stages
data: dict[str, Any] = field(default_factory=dict)
```

The lifecycle of `verdict` at the sample level mirrors the stage outcomes:

| `SampleStatus` | Meaning |
|---|---|
| `PENDING` | Not yet reached |
| `ACTIVE` | Currently in flight |
| `FILTERED` | A stage returned `Verdict.FILTER` |
| `FAILED` | A stage returned an error |
| `COMPLETED` | All stages done |

Sources: [src/deepzero/engine/state.py:44-130](), [src/deepzero/engine/types.py:17-23]()

---

## StateStore: Atomic File I/O

`StateStore` manages all disk writes. Every write goes through `_atomic_write`, which writes to a `.tmp` sibling first, then renames it atomically:

```python
# src/deepzero/engine/state.py:174-177
def _atomic_write(self, path: Path, content: str) -> None:
    tmp = path.with_suffix(".tmp")
    tmp.write_text(content, encoding="utf-8")
    atomic_replace(tmp, path)
```

The rename (`os.replace`) is atomic on both POSIX and Windows. A reader will never see a partially-written file — it either sees the old complete file or the new complete file, never a half-written one. If the process is killed between write and rename, the `.tmp` file is orphaned but the old `state.json` remains intact.

### The Windows Defender Retry Loop

On Windows, Defender briefly locks newly-created `.tmp` files for virus scanning before allowing them to be renamed. `atomic_replace` handles this with a retry loop:

```python
# src/deepzero/engine/state.py:32-41
def atomic_replace(src: Path, dst: Path, retries: int = 5) -> None:
    # windows defender briefly locks .tmp files for scanning after close
    for i in range(retries):
        try:
            os.replace(src, dst)
            return
        except PermissionError:
            if i == retries - 1 or os.name != "nt":
                raise
            time.sleep(0.05)
```

Non-Windows platforms raise immediately on `PermissionError`; Windows gets up to 5 attempts with 50 ms between each. Sources: [src/deepzero/engine/state.py:32-41](), [src/deepzero/engine/state.py:174-177]()

### Directory Layout

```text
<work_dir>/
  run.json              ← RunState (top-level metadata)
  run_manifest.json     ← Summary of all samples (verdict, stage, sha256)
  pipeline.yaml         ← Snapshot of pipeline definition
  samples/
    <sample_id>/
      state.json        ← Full SampleState with history
      context.md        ← Human-readable LLM context (regenerated each stage)
      <stage>_error.log ← Traceback captured on processor exception
      <artifacts…>      ← Files emitted by processors
```

Sources: [src/deepzero/engine/state.py:163-286]()

### Version Gating

State files carry a `_version` field (currently `2`). On load, any file with a version below `STATE_VERSION` is rejected with a warning and treated as absent, preventing stale v1 state from corrupting a v2 run:

```python
# src/deepzero/engine/state.py:216-223
version = data.get("_version", 1)
if version < STATE_VERSION:
    log.warning("sample %s has v%d state, expected v%d - skipping", ...)
    return None
```

Sources: [src/deepzero/engine/state.py:15-16](), [src/deepzero/engine/state.py:216-224]()

---

## Graceful Shutdown (Ctrl-C)

`PipelineRunner` installs a custom `SIGINT` handler on the main thread:

```python
# src/deepzero/engine/runner.py:838-850
def _handle_signal(self, signum, frame) -> None:
    if self._shutdown_event.is_set():
        log.warning("forced shutdown")
        # save state and os._exit(1)
        ...
    log.warning("shutdown requested (press ctrl+c again to force)")
    self._shutdown_event.set()
```

The first Ctrl-C sets `_shutdown_event`. Worker threads poll this event at every sample boundary and immediately stop accepting new work. The executor is shut down with `cancel_futures=True`. The run state is saved as `INTERRUPTED`. A second Ctrl-C triggers a hard `os._exit(1)` after writing whatever state is available.

Because the state store writes are atomic and per-sample, any sample that completed its current stage before the shutdown event propagated will have its result on disk. The next run's `_resume_or_ingest` + `is_stage_done` machinery then skips those samples automatically.

Sources: [src/deepzero/engine/runner.py:826-850](), [src/deepzero/engine/runner.py:456-460]()

---

## Retries and Timeouts

For map stages, `_process_one_map` implements retry logic when `spec.on_failure == FailurePolicy.RETRY`:

```python
# src/deepzero/engine/runner.py:556-583
max_attempts = spec.max_retries + 1 if spec.on_failure == FailurePolicy.RETRY else 1
while attempts < max_attempts:
    ...
    backoff = min(2**attempts, 30)
    if self._shutdown_event.wait(backoff):   # returns True if shutdown fires
        return None
    continue
```

Backoff is exponential (`2^attempt`) capped at 30 seconds. The `_shutdown_event.wait(backoff)` call doubles as both a sleep and a cancellation check: if a shutdown is requested during the wait, the method returns `None` immediately.

For per-sample timeouts, `_execute_with_timeout` wraps the processor call in a single-thread `ThreadPoolExecutor` with `future.result(timeout=spec.timeout)`. Sources: [src/deepzero/engine/runner.py:529-631](), [src/deepzero/engine/runner.py:760-775]()

---

## The Live Dashboard

`PipelineDashboard` (powered by Rich's `Live`) pins a stage table and progress bar to the bottom of the terminal while log output scrolls above it. Each stage row shows the input count, passed/filtered/failed tallies, and elapsed time. A fully-cached stage (all samples already done) is shown with a `↻` glyph and "cached" in the time column instead of a real duration.

```python
# src/deepzero/engine/ui.py:307-318
time_str = (
    "[dim]cached[/]" if info.is_fully_cached else _format_elapsed(info.elapsed_s)
)
glyph = "↻" if info.is_fully_cached else _GLYPH_DONE
```

The dashboard also emits a short interrupt message when the run stops early, reminding the user to re-run without `--clean` to resume. Sources: [src/deepzero/engine/ui.py:226-233](), [src/deepzero/engine/ui.py:304-318]()

---

## The `context.md` Side-Channel

After each sync barrier, `_generate_context_files` regenerates a `context.md` for every active sample. The file is a Markdown summary of the sample's current history, artifacts, and per-stage data values, produced by `generate_context` in `context.py`. It is written atomically using the same `atomic_replace` call as all other state writes. This file is primarily a convenience for LLM-powered processors that want a compact, human-readable view of what has happened to a sample so far:

```python
# src/deepzero/engine/context.py:8-51
def generate_context(sample_dir: Path, state: SampleState) -> None:
    # auto-generate a human-readable context.md for LLM consumption
    ...
    tmp.write_text(content, encoding="utf-8")
    atomic_replace(tmp, path)
```

Generation is skipped when `context.md` is already newer than `state.json`, so the overhead is proportional to the work actually done in the most recent stage. Sources: [src/deepzero/engine/context.py:8-51](), [src/deepzero/engine/runner.py:786-795]()

---

## Summary

`PipelineRunner` executes stages breadth-first with a sync barrier between each one. Map stages use a `ThreadPoolExecutor` and flush results to disk in batches of 50. `StateStore` guarantees that every write is atomic via a write-to-`.tmp`-then-rename pattern, with a Windows Defender retry loop to handle antivirus file locking. `SampleState` records the full outcome of every stage in a `history` dict keyed by stage name. On the next run, `list_samples` re-hydrates those records and `is_stage_done` skips any sample whose stage is already terminal — making every run an exact continuation of the last. The entire durability guarantee rests on `atomic_replace` in `src/deepzero/engine/state.py:32-41`.
