Agent-readable wiki

DeepZero ELI5 Wiki

DeepZero is a pipeline engine that automates vulnerability research on Windows kernel drivers: you describe the work in a YAML file and it discovers, filters, decompiles, scans, and asks an AI to rank every driver for you. It handles parallelism, crash-recovery, and custom plug-in processors so researchers can focus on results instead of plumbing.

Pages

  1. Explain It Simply: What DeepZero DoesPlain-language explanation of the whole project — what problem it solves, the one analogy to hold in mind, and the three ideas every reader must leave with before going deeper.
  2. The Recipe Card: Pipeline YAMLHow a pipeline.yaml file works — what each field means, how stages chain together, how environment variables are expanded, and the hard rule that the first stage must always be an ingest processor.
  3. The Kitchen: Runner & State StoreHow the engine executes stages in parallel threads, writes per-sample JSON atomically so a Ctrl-C mid-run loses nothing, and resumes exactly where it stopped on the next run. Covers StateStore, SampleState, and the atomic-replace trick for Windows Defender.
  4. The Workers: Processors & the RegistryThe four processor types (ingest, map, reduce, bulk-map), how the registry resolves a bare name versus a file path versus a dotted import, and how to write a custom processor as a Python class. Covers built-in stages: filter, hash_filter, top_k, sort, command, and llm.
  5. A Real Run: The loldrivers Vulnerability PipelineStep-by-step walkthrough of the shipped example pipeline — from PE discovery through IOCTL filtering, loldrivers.io dedup, Ghidra decompilation, Semgrep batch scan, top-10 selection, to LLM assessment with a Jinja2 prompt template. Shows what each external processor does and what data it passes to the next stage.
  6. The One Idea to Keep & What to Read NextClosing recap: the core idea in one sentence, the analogy that holds, the three things that make DeepZero different from a script, and concrete pointers to where to look next in the codebase.

Complete Markdown

# DeepZero ELI5 Wiki

> DeepZero is a pipeline engine that automates vulnerability research on Windows kernel drivers: you describe the work in a YAML file and it discovers, filters, decompiles, scans, and asks an AI to rank every driver for you. It handles parallelism, crash-recovery, and custom plug-in processors so researchers can focus on results instead of plumbing.

## Context Links

- [Agent index](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/llms.txt)
- [Human interactive wiki](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324)
- [GitHub repository](https://github.com/416rehman/DeepZero)

## Repository Metadata

- Repository: 416rehman/DeepZero

- Generated: 2026-05-22T02:28:22.406Z
- Updated: 2026-05-22T02:28:28.812Z
- Runtime: Claude Code
- Format: Explain Like I'm 5
- Pages: 6

## Page Index

- 01. [Explain It Simply: What DeepZero Does](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/01-explain-it-simply-what-deepzero-does.md) - Plain-language explanation of the whole project — what problem it solves, the one analogy to hold in mind, and the three ideas every reader must leave with before going deeper.
- 02. [The Recipe Card: Pipeline YAML](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/02-the-recipe-card-pipeline-yaml.md) - How a pipeline.yaml file works — what each field means, how stages chain together, how environment variables are expanded, and the hard rule that the first stage must always be an ingest processor.
- 03. [The Kitchen: Runner & State Store](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/03-the-kitchen-runner-state-store.md) - How the engine executes stages in parallel threads, writes per-sample JSON atomically so a Ctrl-C mid-run loses nothing, and resumes exactly where it stopped on the next run. Covers StateStore, SampleState, and the atomic-replace trick for Windows Defender.
- 04. [The Workers: Processors & the Registry](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/04-the-workers-processors-the-registry.md) - The four processor types (ingest, map, reduce, bulk-map), how the registry resolves a bare name versus a file path versus a dotted import, and how to write a custom processor as a Python class. Covers built-in stages: filter, hash_filter, top_k, sort, command, and llm.
- 05. [A Real Run: The loldrivers Vulnerability Pipeline](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/05-a-real-run-the-loldrivers-vulnerability-pipeline.md) - Step-by-step walkthrough of the shipped example pipeline — from PE discovery through IOCTL filtering, loldrivers.io dedup, Ghidra decompilation, Semgrep batch scan, top-10 selection, to LLM assessment with a Jinja2 prompt template. Shows what each external processor does and what data it passes to the next stage.
- 06. [The One Idea to Keep & What to Read Next](https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/06-the-one-idea-to-keep-what-to-read-next.md) - Closing recap: the core idea in one sentence, the analogy that holds, the three things that make DeepZero different from a script, and concrete pointers to where to look next in the codebase.

## Source File Index

- `pipelines/loldrivers/assessment.j2`
- `pipelines/loldrivers/pipeline.yaml`
- `processors/ghidra_decompile/ghidra_decompile.py`
- `processors/loldrivers_filter/loldrivers_filter.py`
- `processors/pe_ingest/pe_ingest.py`
- `processors/semgrep_scanner/semgrep_scanner.py`
- `pyproject.toml`
- `README.md`
- `src/deepzero/__main__.py`
- `src/deepzero/api/server.py`
- `src/deepzero/cli.py`
- `src/deepzero/engine/context.py`
- `src/deepzero/engine/llm.py`
- `src/deepzero/engine/pipeline.py`
- `src/deepzero/engine/registry.py`
- `src/deepzero/engine/runner.py`
- `src/deepzero/engine/stage.py`
- `src/deepzero/engine/state.py`
- `src/deepzero/engine/types.py`
- `src/deepzero/engine/ui.py`
- `src/deepzero/stages/filter.py`
- `src/deepzero/stages/ingest.py`
- `src/deepzero/stages/llm.py`
- `src/deepzero/stages/top_k.py`

---

## 01. Explain It Simply: What DeepZero Does

> Plain-language explanation of the whole project — what problem it solves, the one analogy to hold in mind, and the three ideas every reader must leave with before going deeper.

- Page Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/01-explain-it-simply-what-deepzero-does.md
- Generated: 2026-05-22T02:27:10.062Z

### Source Files

- `README.md`
- `pyproject.toml`
- `src/deepzero/__main__.py`
- `src/deepzero/cli.py`

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [README.md](README.md)
- [pyproject.toml](pyproject.toml)
- [src/deepzero/__main__.py](src/deepzero/__main__.py)
- [src/deepzero/cli.py](src/deepzero/cli.py)
- [src/deepzero/engine/runner.py](src/deepzero/engine/runner.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
- [src/deepzero/engine/state.py](src/deepzero/engine/state.py)
- [src/deepzero/engine/pipeline.py](src/deepzero/engine/pipeline.py)
- [src/deepzero/engine/llm.py](src/deepzero/engine/llm.py)
- [src/deepzero/stages/__init__.py](src/deepzero/stages/__init__.py)
- [pipelines/loldrivers/pipeline.yaml](pipelines/loldrivers/pipeline.yaml)
</details>

# Explain It Simply: What DeepZero Does

DeepZero is a command-line tool that lets you run automated security research pipelines against a folder of files — no glue code required. You describe *what* to do in a YAML file, and DeepZero handles *how*: running stages in parallel, saving progress after each step so you can safely interrupt and resume, and calling an LLM at the end to produce written assessments.

This page gives you the mental model you need before reading any of the architecture, API, or processor documentation. Three ideas cover everything: pipelines are YAML recipes, samples flow through typed stages, and state lives on disk so nothing is lost.

---

## The One Analogy: A Fault-Tolerant Assembly Line

Imagine a factory assembly line for Windows kernel drivers. A truck arrives with thousands of `.sys` files. The line has several stations:

1. **Receiving** — identify every item and put each one in its own bin.
2. **Screening** — reject items that are already on a known-safe list.
3. **Machining** — run each item through a decompiler to extract readable code.
4. **Inspection** — run an automated scanner over the code to find patterns.
5. **Expert review** — send only the suspicious survivors to an LLM analyst who writes up a verdict.

If the power goes out between stations 3 and 4, you do not start over. Each bin already has the machined output. The line resumes exactly where it left off.

That is DeepZero. The truck is your target directory. The stations are pipeline stages. Each bin is a per-sample folder under `work/`. The LLM analyst speaks any model you configure, through [LiteLLM](https://github.com/BerriAI/litellm).

---

## What Problem It Solves

Vulnerability researchers who want to screen large binary corpora (driver packs, firmware images, OS packages) face the same pain every time:

- They write ad hoc shell scripts that cannot be resumed after a crash.
- Parallelism is tacked on as an afterthought, introducing race conditions.
- Integrating a decompiler, a static analysis tool, and an LLM requires custom glue code for each project.
- Results are scattered across text files with no consistent schema.

DeepZero's YAML-defined pipeline addresses each of these. The README describes it as an "automated vulnerability research pipeline engine" with "atomic per-sample state on disk; Ctrl+C and re-run to pick up where you left off."

Sources: [README.md:6-35]()

---

## Idea 1 — A Pipeline Is a YAML Recipe

You write a single `pipeline.yaml` file. It lists a name, an optional LLM model, and a sequence of stages. Each stage names a processor and can set parallelism, timeouts, retry behavior, and typed configuration.

```yaml
# pipelines/loldrivers/pipeline.yaml (abbreviated)
name: loldrivers
model: vertex_ai/gemini-2.5-pro

settings:
  work_dir: work
  max_workers: 4

stages:
  - name: discover
    processor: pe_ingest/pe_ingest.py   # external processor
    config:
      extensions: [".sys"]
      recursive: true

  - name: kernel_filter
    processor: metadata_filter           # built-in processor
    config:
      require:
        is_kernel_driver: true
        has_ioctl_surface: true

  - name: decompile
    processor: ghidra_decompile/ghidra_decompile.py
    parallel: 0        # 0 = auto-scale to CPU count
    timeout: 600
    config:
      ghidra_install_dir: ${GHIDRA_INSTALL_DIR}   # env-var expansion

  - name: pick_top_10
    processor: top_k
    config:
      metric_path: "semgrep_scanner.finding_count"
      keep_top: 10

  - name: assess
    processor: generic_llm
    parallel: 2
    config:
      prompt: pipelines/loldrivers/assessment.j2
      output_file: assessment.md
```

Sources: [pipelines/loldrivers/pipeline.yaml:1-93]()

The engine loads this at runtime, expands `${VAR}` and `${VAR:-default}` from environment variables, validates that every stage references a real processor class, and then enforces that the first stage is an ingest processor.

Sources: [src/deepzero/engine/pipeline.py:76-215]()

---

## Idea 2 — Samples Flow Through Four Typed Stages

Every file discovered by the ingest stage becomes a *sample*. A sample travels through the remaining stages one by one. Processors are typed by their relationship to the sample stream:

| Type | Relationship | When to use |
|---|---|---|
| `IngestProcessor` | One call, returns a list of samples | File discovery, PE parsing, API ingestion |
| `MapProcessor` | Called once per sample, fan-out via `ThreadPoolExecutor` | Filtering, decompilation, per-file LLM calls |
| `ReduceProcessor` | Sees all active samples at once, returns which survive | Top-k selection, global ranking, deduplication |
| `BulkMapProcessor` | All samples in one external invocation | Semgrep batch scan, any tool with high startup cost |

```
                        ┌─────────────────────────────────────────┐
Target dir              │  Pipeline                               │
    │                   │                                         │
    └──▶ IngestProcessor│ ──▶ sample_a ─┐                        │
                        │               ├──▶ MapProcessor (×N)   │
                        │    sample_b ──┤        │                │
                        │               ├──▶ MapProcessor         │
                        │    sample_c ──┘        │                │
                        │                        ▼                │
                        │               ReduceProcessor           │
                        │               (top-k survivors)         │
                        │                        │                │
                        │                        ▼                │
                        │               MapProcessor (LLM assess) │
                        └─────────────────────────────────────────┘
```

A processor returns one of three verdicts: `ok` (sample continues downstream), `filter` (sample intentionally excluded, still tracked), or `fail` (something broke, error is logged).

Sources: [src/deepzero/engine/stage.py:139-171](), [src/deepzero/engine/stage.py:277-350]()

### Built-in processors

DeepZero ships seven built-in processors you can reference by bare name in any pipeline YAML:

| Name | Type | What it does |
|---|---|---|
| `file_discovery` | Ingest | Recursively discovers files by extension |
| `metadata_filter` | Map | Filters samples by fields stored in history data |
| `hash_exclude` | Map | Excludes samples whose SHA-256 is on a blocklist |
| `generic_llm` | Map | Renders a Jinja2 template and calls the configured LLM |
| `generic_command` | Map | Runs an arbitrary shell command per sample |
| `top_k` | Reduce | Keeps the top-N samples by a numeric field |
| `sort` | Reduce | Reorders samples by a field |

Sources: [src/deepzero/stages/__init__.py:1-17]()

External processors (like the Ghidra decompiler or the loldrivers.io filter) are Python classes in a `processors/` directory. You reference them as `dir/file.py` or `dir/file.py:ClassName` in your YAML. The engine imports and instantiates them at load time.

Sources: [src/deepzero/engine/pipeline.py:154-177]()

### LLM integration is provider-neutral

`LLMProvider` wraps [LiteLLM](https://github.com/BerriAI/litellm), so `model:` in your YAML can be any string LiteLLM understands — `openai/gpt-4o`, `vertex_ai/gemini-2.5-pro`, `anthropic/claude-3-5-sonnet`, a local Ollama model, etc. Rate-limit handling, exponential backoff, and retry logic are built in.

Sources: [src/deepzero/engine/llm.py:26-114]()

---

## Idea 3 — State Lives on Disk, Nothing Is Lost

After every stage completes, the engine writes each sample's full history to disk atomically (write to `.tmp`, rename). A run can be interrupted at any point and resumed without replaying completed work.

The on-disk layout looks like this:

```
work/
└── loldrivers/               ← pipeline work dir
    ├── run.json              ← global run status + per-stage counters
    ├── run_manifest.json     ← summary table of all samples + verdicts
    ├── pipeline.yaml         ← snapshot of the YAML used for this run
    └── samples/
        └── <sample_id>/
            ├── state.json    ← per-stage history, verdicts, data, artifacts
            └── context.md    ← human-readable summary (auto-generated)
```

The `StateStore` class writes every file with an atomic rename, and uses a version field (`_version: 2`) to reject state files from incompatible older runs.

Sources: [src/deepzero/engine/state.py:163-286]()

When you run `deepzero run` against a target that already has a `work/` directory, the engine detects existing sample state and skips straight to the first incomplete stage. From the runner:

> "fast resume: if states already exist on disk, skip the expensive ingest"

Sources: [src/deepzero/engine/runner.py:275-296]()

A `Ctrl+C` during execution sets a shutdown event, drains in-flight threads, saves state for all in-progress samples, and marks the run as `interrupted` rather than `failed`. A second `Ctrl+C` forces an immediate exit.

Sources: [src/deepzero/engine/runner.py:826-850]()

---

## The Command Line in Practice

```bash
# run a pipeline (resumes automatically if interrupted)
deepzero run C:\drivers -p .\pipelines\loldrivers\pipeline.yaml

# check status of a run without re-running it
deepzero status -p loldrivers

# validate a pipeline YAML without executing it
deepzero validate loldrivers

# scaffold a new empty pipeline
deepzero init my-new-pipeline

# list all registered processor types
deepzero list-processors

# start an interactive LLM conversation over a completed work directory
deepzero interactive --work-dir work/loldrivers
```

The `run` command accepts a `--model` flag to override the model without editing the YAML, and a `--clean` flag to discard previous state and start fresh (the old work directory is moved aside, not deleted immediately, to avoid data loss on Windows where Defender briefly locks files).

Sources: [src/deepzero/cli.py:132-224]()

---

## How the Three Ideas Connect

```text
pipeline.yaml          Engine                     Disk
─────────────          ──────                     ────
stages: [...]  ──▶  load & validate
                    expand ${ENV}
                    resolve processor classes
                            │
                            ▼
                    ingest: discover files ──────▶ work/samples/<id>/state.json
                            │
                    for each stage:
                      if Map   → ThreadPool      ──▶ state.json (per sample, atomic)
                      if Reduce → barrier, rank  ──▶ state.json (filtered samples)
                      if Batch → one invocation  ──▶ state.json (indexed results)
                            │
                      sync barrier: save manifest ▶ run_manifest.json
                            │
                    mark run completed            ──▶ run.json
```

DeepZero is deliberately thin: the YAML and the processor base classes are the whole contract. The engine never looks inside your data dictionaries — it just passes them forward via each sample's `history` ledger so downstream processors can read upstream results with `entry.upstream_data("stage_name", "field")`.

Sources: [src/deepzero/engine/stage.py:103-113](), [src/deepzero/engine/runner.py:65-90]()

---

## Three Things to Take Away

1. **Pipeline-as-YAML**: Every pipeline is a YAML file listing typed processor stages. Built-in processors cover common needs; external processors (Python classes) handle specialized tools. No glue code is required.

2. **Four processor shapes**: Ingest discovers samples, Map transforms one at a time (parallel), Reduce filters the whole set at once, and BulkMap batches them into a single external invocation. Knowing which shape to use determines how the engine schedules your work.

3. **Atomic, resumable state**: Each sample's history is written atomically after every stage. Interrupt a run at any point and re-run the same command — the engine picks up from the first incomplete stage. No work is repeated.

The shipped `loldrivers` pipeline demonstrates all three: it ingests `.sys` driver files from a directory tree, filters by kernel driver metadata and an online hash blocklist, decompiles survivors with Ghidra, scans with Semgrep, reduces to the top 10 findings, and sends those to a configurable LLM for a written vulnerability assessment.

Sources: [pipelines/loldrivers/pipeline.yaml:1-93](), [src/deepzero/engine/runner.py:128-265]()

---

## 02. The Recipe Card: Pipeline YAML

> How a pipeline.yaml file works — what each field means, how stages chain together, how environment variables are expanded, and the hard rule that the first stage must always be an ingest processor.

- Page Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/02-the-recipe-card-pipeline-yaml.md
- Generated: 2026-05-22T02:25:42.894Z

### Source Files

- `pipelines/loldrivers/pipeline.yaml`
- `src/deepzero/engine/pipeline.py`
- `src/deepzero/engine/stage.py`
- `src/deepzero/engine/types.py`

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [pipelines/loldrivers/pipeline.yaml](pipelines/loldrivers/pipeline.yaml)
- [src/deepzero/engine/pipeline.py](src/deepzero/engine/pipeline.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
- [src/deepzero/engine/types.py](src/deepzero/engine/types.py)
- [src/deepzero/engine/registry.py](src/deepzero/engine/registry.py)
- [processors/pe_ingest/pe_ingest.py](processors/pe_ingest/pe_ingest.py)
</details>

# The Recipe Card: Pipeline YAML

A `pipeline.yaml` file is the single source of truth for everything DeepZero needs to run a data-analysis pipeline: what to call it, which LLM to use, how much parallelism to allow, and — most importantly — the ordered list of processing stages that each sample travels through. Think of it as a recipe card: the ingredients are your files or data samples, and each stage is a cooking step that filters, transforms, or ranks those samples before the next step sees them.

This page explains every field in the schema, how stages chain together into a directed flow, how `${ENV_VAR}` placeholders are expanded at load time, and the hard architectural rule that the pipeline's first stage must always be an **ingest** processor.

---

## Top-Level Fields

A `pipeline.yaml` is a YAML mapping. The engine reads it with `yaml.safe_load` and validates the structure before a single sample is ever touched.

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `name` | string | no | Human-readable pipeline identifier. Defaults to the directory name. Becomes the subdirectory under `work_dir`. |
| `description` | string | no | Free-text description shown in listings. |
| `version` | string | no | Informational version tag. |
| `model` | string | no | LLM identifier passed to every stage that calls an LLM (e.g. `vertex_ai/gemini-2.5-pro`). Can be overridden on the CLI. |
| `settings` | mapping | no | Pipeline-wide settings. See below. |
| `knowledge` | mapping | no | Arbitrary key-value data available to every processor via `ctx.get_knowledge()`. |
| `stages` | list | **yes** | Ordered list of stage definitions. Must contain at least one entry. |

Sources: [src/deepzero/engine/pipeline.py:90-99]()

### The `settings` block

```yaml
settings:
  work_dir: work        # base directory for output (default: "work")
  max_workers: 4        # thread-pool ceiling (default: min(4, cpu_count))
```

`work_dir` is always resolved as `<work_dir>/<pipeline_name>/`, so each pipeline gets its own workspace. Relative paths are anchored to `cwd`.

Sources: [src/deepzero/engine/pipeline.py:53-62]()

---

## Stage Fields

Every entry under `stages:` is a mapping with these keys:

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `name` | string | no | `stage_N` | Unique label for this stage. Used for logging, state storage keys, and `entry.upstream("name")` lookups. Duplicate names raise a `ValueError`. |
| `processor` | string | **yes** | — | Which processor class to run. See [Processor Resolution](#processor-resolution) below. |
| `config` | mapping | no | `{}` | Passed verbatim to the processor's typed `Config` dataclass (or as a raw dict if no `Config` is declared). |
| `parallel` | int | no | `4` | Number of concurrent threads for `MapProcessor` stages. `0` means "use the hardware maximum". |
| `on_failure` | string | no | `skip` | What to do when a sample fails: `skip` (mark failed, continue others), `retry` (re-attempt up to `max_retries`), or `abort` (halt the entire pipeline). |
| `max_retries` | int | no | `0` | Number of retries when `on_failure: retry`. |
| `timeout` | int | no | `0` | Per-sample timeout in seconds. `0` means no timeout. |

Sources: [src/deepzero/engine/stage.py:173-188](), [src/deepzero/engine/pipeline.py:118-134]()

### Stage example from the loldrivers pipeline

```yaml
# pipelines/loldrivers/pipeline.yaml (lines 56-68)
- name: decompile
  processor: ghidra_decompile/ghidra_decompile.py
  parallel: 0           # use every available core
  timeout: 600          # 10 minutes per sample
  config:
    strategy: extract_dispatch.py
    max_functions: 60
    max_depth: 4
    ghidra_install_dir: ${GHIDRA_INSTALL_DIR}
    java_home: ${JAVA_HOME:-}
```

---

## Processor Resolution

The `processor` field can take three forms. The engine's `resolve_processor_class()` function tries them in order:

```
bare name              → built-in registry  (e.g. "metadata_filter", "top_k", "generic_llm")
dir/file.py            → processors/<dir>/<file>.py, first Processor subclass found
dir/file.py:ClassName  → processors/<dir>/<file>.py, specific named class
```

External processors (the path forms) are loaded from the `processors/` directory relative to `cwd`. The file is imported via `importlib`, and `_source_file` is set so the processor can locate its own co-located assets.

Sources: [src/deepzero/engine/registry.py:20-39](), [src/deepzero/engine/registry.py:54-85]()

```text
Processor reference forms
─────────────────────────────────────────────────────────────
"metadata_filter"                 → built-in registry lookup
"pe_ingest/pe_ingest.py"          → processors/pe_ingest/pe_ingest.py
                                     (first Processor subclass)
"pe_ingest/pe_ingest.py:PEIngest" → processors/pe_ingest/pe_ingest.py
                                     (class named PEIngest)
```

---

## How Stages Chain Together

The pipeline is a strictly linear sequence. Each sample is an independent unit of work that flows through the stages one by one. The four processor types define how each stage interacts with the sample stream:

```
target path
    │
    ▼
┌─────────────────────────────────────┐
│  IngestProcessor  (stage 0 only)    │  discovers samples from a source
│  e.g. pe_ingest                     │  returns list[Sample]
└──────────────────┬──────────────────┘
                   │  sample_a, sample_b, sample_c …
    ┌──────────────┼──────────────────────────────┐
    ▼              ▼                              ▼
┌──────────┐ ┌──────────┐                 ┌──────────┐
│MapProc.  │ │MapProc.  │   (parallel)    │MapProc.  │  1:1 per sample
│ (filter) │ │ (llm)    │                 │ (decomp) │  ok / filter / fail
└────┬─────┘ └────┬─────┘                 └────┬─────┘
     │             │                            │
     └─────────────┴────────────────────────────┘
                   │  surviving samples
                   ▼
       ┌───────────────────────┐
       │  ReduceProcessor      │  sees ALL samples at once
       │  e.g. top_k           │  returns list[sample_id] to keep
       └───────────┬───────────┘
                   │  top N samples
                   ▼
       ┌───────────────────────┐
       │  BulkMapProcessor     │  one external invocation for all samples
       │  e.g. semgrep_scanner │  returns list[ProcessorResult]
       └───────────────────────┘
```

**Key rules enforced at load time:**

1. **Stage 0 must be `IngestProcessor`** — the engine raises a `ValueError` if the first processor is not an `IngestProcessor`.
2. **`IngestProcessor` may only appear at stage 0** — placing one at any other position raises a `ValueError`.
3. Stages 1+ must be `MapProcessor`, `ReduceProcessor`, or `BulkMapProcessor`.

Sources: [src/deepzero/engine/pipeline.py:159-177]()

---

## The Hard Rule: First Stage Must Be Ingest

This is not a style convention — it is enforced by `_resolve_processors()` in the engine:

```python
# src/deepzero/engine/pipeline.py:159-164
if i == 0:
    if not isinstance(instance, IngestProcessor):
        raise ValueError(
            f"first stage '{spec.name}' (processor='{spec.processor}') must be an IngestProcessor. "
            f"got {cls.__name__}. every pipeline must start with an ingest processor."
        )
```

The reason is architectural: `IngestProcessor.process()` receives a `target: Path` and returns a `list[Sample]`. Every other processor type receives individual `ProcessorEntry` objects, which only exist *after* the ingest stage creates them. There is no sample stream without an ingest stage to seed it.

Sources: [src/deepzero/engine/stage.py:277-288](), [src/deepzero/engine/pipeline.py:154-165]()

---

## Environment Variable Expansion

All string values in the YAML — including nested `config` fields — are expanded before any processor sees them. Expansion happens immediately after `yaml.safe_load()`, before validation.

**Two supported forms:**

| Syntax | Behavior |
|--------|----------|
| `${VAR}` | Replaced by the environment variable value. If `VAR` is unset, the literal `${VAR}` is kept unchanged. |
| `${VAR:-default}` | Replaced by the environment variable value, or `default` if unset. |

```python
# src/deepzero/engine/pipeline.py:269-275
def _expand_string(s: str) -> str:
    def _replace(match: re.Match) -> str:
        var = match.group(1)
        if ":-" in var:
            name, default = var.split(":-", 1)
            return os.environ.get(name, default)
        return os.environ.get(var, match.group(0))   # keeps original if unset
    return re.sub(r"\$\{([^}]+)\}", _replace, s)
```

In the loldrivers pipeline, the Ghidra stage uses both forms:

```yaml
# pipelines/loldrivers/pipeline.yaml:66-67
ghidra_install_dir: ${GHIDRA_INSTALL_DIR}   # required — kept literal if unset
java_home: ${JAVA_HOME:-}                   # optional — empty string if unset
```

The processor's own `validate()` method should check that required fields are non-empty after expansion. If `GHIDRA_INSTALL_DIR` is not set in the environment, the decompile stage will fail at validation time with a clear error rather than mid-run.

Sources: [src/deepzero/engine/pipeline.py:256-276]()

---

## Pipeline Load and Validation Sequence

When `load_pipeline(ref)` is called, six steps happen in order before any sample is touched:

1. **Path resolution** — `ref` is resolved against `cwd/pipelines/<ref>`, `~/.deepzero/pipelines/<ref>`, and the installed package's `pipelines/` directory.
2. **YAML parse** — `yaml.safe_load()` reads the file.
3. **Env-var expansion** — `_expand_env_vars()` recursively walks every string in the parsed dict.
4. **StageSpec construction** — each `stages:` entry becomes a `StageSpec` dataclass; duplicate names and invalid `on_failure` values are rejected.
5. **Processor resolution** — `_resolve_processors()` instantiates every processor class and enforces the ingest-first rule.
6. **Processor-level validation** — each processor's `validate(ctx)` hook runs; errors are collected and raised together.

Sources: [src/deepzero/engine/pipeline.py:76-151]()

---

## Complete Field Reference

```yaml
name: <string>            # pipeline identifier (default: directory name)
description: <string>     # human-readable description
version: <string>         # informational version tag

model: <string>           # LLM model identifier for generic_llm stages

settings:
  work_dir: <path>        # output root (default: "work")
  max_workers: <int>      # thread-pool ceiling (default: min(4, cpu_count))

knowledge:                # arbitrary key-value data for processors
  <key>: <value>

stages:
  - name: <string>        # unique stage label
    processor: <ref>      # bare name | dir/file.py | dir/file.py:Class
    config:               # processor-specific settings; ${VAR} expanded
      <key>: <value>
    parallel: <int>       # concurrency level (default: 4, 0 = max)
    on_failure: skip      # skip | retry | abort (default: skip)
    max_retries: <int>    # retries when on_failure=retry (default: 0)
    timeout: <int>        # per-sample seconds, 0 = none (default: 0)
```

---

## Summary

A `pipeline.yaml` is a compact declaration that the DeepZero engine turns into an executable, stateful data-processing workflow. The file is parsed and fully validated — including environment variable expansion and per-processor health checks — before any sample is ever touched. The strictest invariant in the whole system is that **position 0 must always be an `IngestProcessor`**: it seeds the sample stream that every subsequent `map`, `reduce`, and `bulk_map` stage depends on. This is enforced in code, not convention, at `src/deepzero/engine/pipeline.py:159-164`.

---

## 03. The Kitchen: Runner & State Store

> How the engine executes stages in parallel threads, writes per-sample JSON atomically so a Ctrl-C mid-run loses nothing, and resumes exactly where it stopped on the next run. Covers StateStore, SampleState, and the atomic-replace trick for Windows Defender.

- Page Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/03-the-kitchen-runner-state-store.md
- Generated: 2026-05-22T02:26:04.040Z

### Source Files

- `src/deepzero/engine/runner.py`
- `src/deepzero/engine/state.py`
- `src/deepzero/engine/context.py`
- `src/deepzero/engine/ui.py`

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [src/deepzero/engine/runner.py](src/deepzero/engine/runner.py)
- [src/deepzero/engine/state.py](src/deepzero/engine/state.py)
- [src/deepzero/engine/context.py](src/deepzero/engine/context.py)
- [src/deepzero/engine/ui.py](src/deepzero/engine/ui.py)
- [src/deepzero/engine/types.py](src/deepzero/engine/types.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
</details>

# The Kitchen: Runner & State Store

This page covers the execution heart of DeepZero: how `PipelineRunner` shepherds samples through stages using parallel threads, how `StateStore` persists the result of every sample after each stage in a crash-safe way, and how the whole system can be interrupted mid-flight and resume exactly where it left off on the next run.

Think of `PipelineRunner` as a kitchen brigade: one person (ingest) gathers all the ingredients first, then a line of stations (stages) process each item — several in parallel — while a meticulous ledger (the state store) records what happened to every item after each station. If the kitchen loses power, the ledger lets you skip everything that was already done when the power comes back.

---

## Core Responsibilities at a Glance

| Component | File | Role |
|---|---|---|
| `PipelineRunner` | `runner.py` | Orchestrates stages; manages threads, signals, retries |
| `StateStore` | `state.py` | Atomic file I/O; persists run and sample state |
| `SampleState` | `state.py` | Per-sample ledger of stage outcomes |
| `RunState` | `state.py` | Top-level run metadata (status, stats, stage list) |
| `PipelineDashboard` | `ui.py` | Live Rich terminal dashboard during execution |
| `generate_context` | `context.py` | Regenerates human-readable `context.md` after each stage |

---

## The Lifecycle of a Run

```text
PipelineRunner.run()
  │
  ├─ install SIGINT handler
  ├─ mark RunState → RUNNING, save to disk
  │
  ├─ _resume_or_ingest()
  │     ├─ [existing samples on disk?] → load them, skip ingest  ← resume path
  │     └─ [no samples] → run IngestProcessor, create SampleState per sample
  │
  └─ breadth-first stage loop
        for each (StageSpec, Processor):
          ├─ _run_map()      (one sample per thread)
          ├─ _run_batch()    (all samples handed to processor at once)
          └─ _run_reduce()   (processor selects which sample IDs survive)
          │
          ├─ sync barrier: save_manifest(), generate_context_files()
          └─ save_run() with per-stage stats
```

Sources: [src/deepzero/engine/runner.py:102-265]()

---

## Ingest vs. Resume

When `PipelineRunner._resume_or_ingest()` is called, the very first thing it does is check whether sample directories already exist on disk:

```python
# src/deepzero/engine/runner.py:277-283
existing_states = self.state_store.list_samples()
if existing_states:
    log.info("resume: found %d existing samples, skipping ingest", len(existing_states))
    sample_states = {s.sample_id: s for s in existing_states}
    ...
    return sample_states
```

If `samples/` already has subdirectories with `state.json` files, the expensive ingest step is skipped entirely. The run picks up the same sample objects and passes them into the stage loop. Each stage then checks `is_stage_done()` to skip samples it already processed (see [Skipping Already-Done Work](#skipping-already-done-work) below).

On a fresh run, the ingest processor discovers all samples, a `SampleState` is created per sample, and each state is immediately written to disk before any pipeline stage begins.

Sources: [src/deepzero/engine/runner.py:267-369](), [src/deepzero/engine/state.py:231-240]()

---

## Parallel Execution: Three Stage Flavors

The runner supports three processor shapes, selected at runtime by `isinstance` checks:

### Map (`_run_map`)

The most common shape. Each sample is processed independently. When `spec.parallel > 1`, a `ThreadPoolExecutor` is created:

```python
# src/deepzero/engine/runner.py:451-496
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_map = {
        executor.submit(self._process_one_map, s, spec, processor): s for s in pending
    }
    for future in concurrent.futures.as_completed(future_map):
        if self._shutdown_event.is_set():
            executor.shutdown(wait=False, cancel_futures=True)
            break
        ...
```

When `spec.parallel <= 1` (serial mode), samples are processed in a simple loop. In both cases, results accumulate in a `dirty` list and are flushed to disk in batches of 50:

```python
# src/deepzero/engine/runner.py:444-448
if len(dirty) >= 50:
    for s in dirty:
        self.state_store.save_sample(s)
    dirty.clear()
```

This batching reduces I/O pressure while bounding the maximum data loss on an abrupt crash to 50 samples.

### Batch (`_run_batch`)

A `BulkMapProcessor` receives the entire list of pending entries in a single `process()` call. Results are mapped back by index position. Each sample state is saved immediately after its result is mapped. Sources: [src/deepzero/engine/runner.py:672-746]()

### Reduce (`_run_reduce`)

A `ReduceProcessor` returns a set of sample IDs to keep. Any sample not in that set is marked `FILTERED`. This is the only stage where one sample's fate can depend on others. Sources: [src/deepzero/engine/runner.py:635-668]()

---

## Skipping Already-Done Work

At the top of both `_run_map` and `_run_batch`, the runner filters out samples that are already done for the current stage:

```python
# src/deepzero/engine/runner.py:409-410
pending = [s for s in active if not s.is_stage_done(spec.name)]
```

`is_stage_done` reads `SampleState.history` and returns `True` if the stage's `StageStatus` is `COMPLETED`, `FILTERED`, or `FAILED`:

```python
# src/deepzero/engine/state.py:119-127
def is_stage_done(self, stage_name: str) -> bool:
    s = self.history.get(stage_name)
    if s is None:
        return False
    return s.status in (
        StageStatus.COMPLETED,
        StageStatus.FILTERED,
        StageStatus.FAILED,
    )
```

Because history is persisted to disk after each sample, a process that was interrupted mid-stage will, on the next run, find those completed samples in `history` and skip them. Only the samples that were truly incomplete (or never started) are re-run. Sources: [src/deepzero/engine/state.py:119-127](), [src/deepzero/engine/runner.py:409-414]()

---

## The Sync Barrier

After every stage finishes, the runner performs a mandatory sync before proceeding to the next stage:

```python
# src/deepzero/engine/runner.py:247-248
self.state_store.save_manifest(list(sample_states.values()))
self._generate_context_files(sample_states)
```

`save_manifest` writes a summary JSON (`run_manifest.json`) containing per-sample verdict, current stage, and sha256. The context-file generation regenerates a `context.md` per sample in parallel (up to `cpu_count * 4` threads). These two steps together form the sync barrier — a checkpoint that makes the on-disk state consistent before the next stage reads from it.

Sources: [src/deepzero/engine/runner.py:244-251](), [src/deepzero/engine/state.py:244-269]()

---

## SampleState: The Per-Sample Ledger

`SampleState` is a dataclass with a `history` dict that maps stage names to `StageOutput` records:

```python
# src/deepzero/engine/state.py:57-67
@dataclass
class SampleState:
    sample_id: str
    sha256: str = ""
    source_path: str = ""
    filename: str = ""
    verdict: SampleStatus = SampleStatus.PENDING
    current_stage: str = ""
    history: dict[str, StageOutput] = field(default_factory=dict)
    error: str | None = None
```

Each `StageOutput` carries status, verdict, timestamps, artifacts, freeform `data`, and an optional error string. Stage outputs are never merged across stages — the data dict is fully namespaced per stage:

```python
# src/deepzero/engine/state.py:52
# namespaced processor output - never merged across stages
data: dict[str, Any] = field(default_factory=dict)
```

The lifecycle of `verdict` at the sample level mirrors the stage outcomes:

| `SampleStatus` | Meaning |
|---|---|
| `PENDING` | Not yet reached |
| `ACTIVE` | Currently in flight |
| `FILTERED` | A stage returned `Verdict.FILTER` |
| `FAILED` | A stage returned an error |
| `COMPLETED` | All stages done |

Sources: [src/deepzero/engine/state.py:44-130](), [src/deepzero/engine/types.py:17-23]()

---

## StateStore: Atomic File I/O

`StateStore` manages all disk writes. Every write goes through `_atomic_write`, which writes to a `.tmp` sibling first, then renames it atomically:

```python
# src/deepzero/engine/state.py:174-177
def _atomic_write(self, path: Path, content: str) -> None:
    tmp = path.with_suffix(".tmp")
    tmp.write_text(content, encoding="utf-8")
    atomic_replace(tmp, path)
```

The rename (`os.replace`) is atomic on both POSIX and Windows. A reader will never see a partially-written file — it either sees the old complete file or the new complete file, never a half-written one. If the process is killed between write and rename, the `.tmp` file is orphaned but the old `state.json` remains intact.

### The Windows Defender Retry Loop

On Windows, Defender briefly locks newly-created `.tmp` files for virus scanning before allowing them to be renamed. `atomic_replace` handles this with a retry loop:

```python
# src/deepzero/engine/state.py:32-41
def atomic_replace(src: Path, dst: Path, retries: int = 5) -> None:
    # windows defender briefly locks .tmp files for scanning after close
    for i in range(retries):
        try:
            os.replace(src, dst)
            return
        except PermissionError:
            if i == retries - 1 or os.name != "nt":
                raise
            time.sleep(0.05)
```

Non-Windows platforms raise immediately on `PermissionError`; Windows gets up to 5 attempts with 50 ms between each. Sources: [src/deepzero/engine/state.py:32-41](), [src/deepzero/engine/state.py:174-177]()

### Directory Layout

```text
<work_dir>/
  run.json              ← RunState (top-level metadata)
  run_manifest.json     ← Summary of all samples (verdict, stage, sha256)
  pipeline.yaml         ← Snapshot of pipeline definition
  samples/
    <sample_id>/
      state.json        ← Full SampleState with history
      context.md        ← Human-readable LLM context (regenerated each stage)
      <stage>_error.log ← Traceback captured on processor exception
      <artifacts…>      ← Files emitted by processors
```

Sources: [src/deepzero/engine/state.py:163-286]()

### Version Gating

State files carry a `_version` field (currently `2`). On load, any file with a version below `STATE_VERSION` is rejected with a warning and treated as absent, preventing stale v1 state from corrupting a v2 run:

```python
# src/deepzero/engine/state.py:216-223
version = data.get("_version", 1)
if version < STATE_VERSION:
    log.warning("sample %s has v%d state, expected v%d - skipping", ...)
    return None
```

Sources: [src/deepzero/engine/state.py:15-16](), [src/deepzero/engine/state.py:216-224]()

---

## Graceful Shutdown (Ctrl-C)

`PipelineRunner` installs a custom `SIGINT` handler on the main thread:

```python
# src/deepzero/engine/runner.py:838-850
def _handle_signal(self, signum, frame) -> None:
    if self._shutdown_event.is_set():
        log.warning("forced shutdown")
        # save state and os._exit(1)
        ...
    log.warning("shutdown requested (press ctrl+c again to force)")
    self._shutdown_event.set()
```

The first Ctrl-C sets `_shutdown_event`. Worker threads poll this event at every sample boundary and immediately stop accepting new work. The executor is shut down with `cancel_futures=True`. The run state is saved as `INTERRUPTED`. A second Ctrl-C triggers a hard `os._exit(1)` after writing whatever state is available.

Because the state store writes are atomic and per-sample, any sample that completed its current stage before the shutdown event propagated will have its result on disk. The next run's `_resume_or_ingest` + `is_stage_done` machinery then skips those samples automatically.

Sources: [src/deepzero/engine/runner.py:826-850](), [src/deepzero/engine/runner.py:456-460]()

---

## Retries and Timeouts

For map stages, `_process_one_map` implements retry logic when `spec.on_failure == FailurePolicy.RETRY`:

```python
# src/deepzero/engine/runner.py:556-583
max_attempts = spec.max_retries + 1 if spec.on_failure == FailurePolicy.RETRY else 1
while attempts < max_attempts:
    ...
    backoff = min(2**attempts, 30)
    if self._shutdown_event.wait(backoff):   # returns True if shutdown fires
        return None
    continue
```

Backoff is exponential (`2^attempt`) capped at 30 seconds. The `_shutdown_event.wait(backoff)` call doubles as both a sleep and a cancellation check: if a shutdown is requested during the wait, the method returns `None` immediately.

For per-sample timeouts, `_execute_with_timeout` wraps the processor call in a single-thread `ThreadPoolExecutor` with `future.result(timeout=spec.timeout)`. Sources: [src/deepzero/engine/runner.py:529-631](), [src/deepzero/engine/runner.py:760-775]()

---

## The Live Dashboard

`PipelineDashboard` (powered by Rich's `Live`) pins a stage table and progress bar to the bottom of the terminal while log output scrolls above it. Each stage row shows the input count, passed/filtered/failed tallies, and elapsed time. A fully-cached stage (all samples already done) is shown with a `↻` glyph and "cached" in the time column instead of a real duration.

```python
# src/deepzero/engine/ui.py:307-318
time_str = (
    "[dim]cached[/]" if info.is_fully_cached else _format_elapsed(info.elapsed_s)
)
glyph = "↻" if info.is_fully_cached else _GLYPH_DONE
```

The dashboard also emits a short interrupt message when the run stops early, reminding the user to re-run without `--clean` to resume. Sources: [src/deepzero/engine/ui.py:226-233](), [src/deepzero/engine/ui.py:304-318]()

---

## The `context.md` Side-Channel

After each sync barrier, `_generate_context_files` regenerates a `context.md` for every active sample. The file is a Markdown summary of the sample's current history, artifacts, and per-stage data values, produced by `generate_context` in `context.py`. It is written atomically using the same `atomic_replace` call as all other state writes. This file is primarily a convenience for LLM-powered processors that want a compact, human-readable view of what has happened to a sample so far:

```python
# src/deepzero/engine/context.py:8-51
def generate_context(sample_dir: Path, state: SampleState) -> None:
    # auto-generate a human-readable context.md for LLM consumption
    ...
    tmp.write_text(content, encoding="utf-8")
    atomic_replace(tmp, path)
```

Generation is skipped when `context.md` is already newer than `state.json`, so the overhead is proportional to the work actually done in the most recent stage. Sources: [src/deepzero/engine/context.py:8-51](), [src/deepzero/engine/runner.py:786-795]()

---

## Summary

`PipelineRunner` executes stages breadth-first with a sync barrier between each one. Map stages use a `ThreadPoolExecutor` and flush results to disk in batches of 50. `StateStore` guarantees that every write is atomic via a write-to-`.tmp`-then-rename pattern, with a Windows Defender retry loop to handle antivirus file locking. `SampleState` records the full outcome of every stage in a `history` dict keyed by stage name. On the next run, `list_samples` re-hydrates those records and `is_stage_done` skips any sample whose stage is already terminal — making every run an exact continuation of the last. The entire durability guarantee rests on `atomic_replace` in `src/deepzero/engine/state.py:32-41`.

---

## 04. The Workers: Processors & the Registry

> The four processor types (ingest, map, reduce, bulk-map), how the registry resolves a bare name versus a file path versus a dotted import, and how to write a custom processor as a Python class. Covers built-in stages: filter, hash_filter, top_k, sort, command, and llm.

- Page Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/04-the-workers-processors-the-registry.md
- Generated: 2026-05-22T02:26:38.102Z

### Source Files

- `src/deepzero/engine/registry.py`
- `src/deepzero/engine/stage.py`
- `src/deepzero/stages/ingest.py`
- `src/deepzero/stages/filter.py`
- `src/deepzero/stages/top_k.py`
- `src/deepzero/stages/llm.py`
- `src/deepzero/engine/llm.py`

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [src/deepzero/engine/registry.py](src/deepzero/engine/registry.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
- [src/deepzero/engine/llm.py](src/deepzero/engine/llm.py)
- [src/deepzero/stages/__init__.py](src/deepzero/stages/__init__.py)
- [src/deepzero/stages/ingest.py](src/deepzero/stages/ingest.py)
- [src/deepzero/stages/filter.py](src/deepzero/stages/filter.py)
- [src/deepzero/stages/hash_filter.py](src/deepzero/stages/hash_filter.py)
- [src/deepzero/stages/top_k.py](src/deepzero/stages/top_k.py)
- [src/deepzero/stages/sort.py](src/deepzero/stages/sort.py)
- [src/deepzero/stages/command.py](src/deepzero/stages/command.py)
- [src/deepzero/stages/llm.py](src/deepzero/stages/llm.py)
</details>

# The Workers: Processors & the Registry

A DeepZero pipeline is a sequence of **processors** — Python classes that do the actual work on a stream of samples (files, binaries, documents). Each processor belongs to one of four types that describe *how* it relates to the stream: discovering samples, transforming them one-at-a-time, transforming them all at once, or running a global selection pass. The **registry** is the lookup table that maps a name you write in YAML to the class that runs at pipeline time.

This page explains the four processor types, the three ways the registry resolves a processor reference, and how to write and wire up a custom processor. It also documents every built-in stage — `file_discovery`, `metadata_filter`, `hash_exclude`, `top_k`, `sort`, `generic_command`, and `generic_llm`.

---

## The Four Processor Types

Every processor class inherits from one of four abstract base classes declared in `src/deepzero/engine/stage.py`. The type you choose determines what data your `process()` method receives and what it must return.

```text
ProcessorType enum (stage.py:50-54)
  INGEST    → one call, produces the sample list
  MAP       → one call per sample, parallel
  REDUCE    → one call with ALL samples, serial
  BULK_MAP  → one call with ALL samples, returns one result per sample
```

### IngestProcessor — discovering samples

`IngestProcessor` is always the first stage in a pipeline. It runs **once**, receives the target `Path`, and returns a flat list of `Sample` objects. Nothing upstream has run yet; this is where the sample stream begins.

```text
/target/path ──▶ [ IngestProcessor ] ──▶ sample_a, sample_b, sample_c ...
```

```python
# src/deepzero/engine/stage.py:277-288
class IngestProcessor(Processor):
    processor_type = ProcessorType.INGEST

    @abstractmethod
    def process(self, ctx: ProcessorContext, target: Path) -> list[Sample]: ...
```

Use cases: file discovery, binary scanning, API ingestion, manifest loading.

### MapProcessor — per-sample transformation

`MapProcessor` is the workhorse. The engine fans out calls across a `ThreadPoolExecutor` so samples are processed in parallel. Each call gets one `ProcessorEntry` (the sample's identity plus lazy-loaded history from all previous stages) and must return a `ProcessorResult`.

```text
sample_a ──▶ [ MapProcessor ] ──▶ result_a   (ok / filter / fail)
sample_b ──▶ [ MapProcessor ] ──▶ result_b   ← parallel
sample_c ──▶ [ MapProcessor ] ──▶ result_c
```

Because calls are parallel, `process()` must be **thread-safe**: no shared mutable state.

```python
# src/deepzero/engine/stage.py:291-310
class MapProcessor(Processor):
    processor_type = ProcessorType.MAP

    @abstractmethod
    def process(self, ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult: ...

    def should_skip(self, ctx: ProcessorContext, entry: ProcessorEntry) -> str | None:
        # return a reason string to skip already-processed samples (cached output)
        return None
```

Use cases: metadata extraction, LLM analysis, decompilation, format-specific filtering.

### ReduceProcessor — global selection

`ReduceProcessor` receives **all active samples at once**. It is a synchronization barrier: the engine cannot parallelize it or split it across chunks. Return a list of `sample_id` strings to keep; every ID not returned is dropped.

```text
┌ sample_a ┐                      ┌ sample_c ┐
│ sample_b │ ──▶ [ Reduce ] ──▶   │ sample_a │  (reordered, sample_b filtered)
│ sample_c │                      └──────────┘
└──────────┘
```

```python
# src/deepzero/engine/stage.py:313-330
class ReduceProcessor(Processor):
    processor_type = ProcessorType.REDUCE

    @abstractmethod
    def process(self, ctx: ProcessorContext, entries: list[ProcessorEntry]) -> list[str]: ...
```

Use cases: top-k selection, sorting by priority score, global deduplication.

### BulkMapProcessor — batched external invocation

`BulkMapProcessor` receives all active samples in a single call and must return a `ProcessorResult` for each one (matched by index). Unlike `ReduceProcessor`, it does not filter: it produces a result per entry. It is designed for tools with high process-startup cost (e.g., semgrep, bulk static analysis) where spawning one process per sample would be prohibitive.

```python
# src/deepzero/engine/stage.py:333-350
class BulkMapProcessor(Processor):
    processor_type = ProcessorType.BULK_MAP

    @abstractmethod
    def process(
        self, ctx: ProcessorContext, entries: list[ProcessorEntry]
    ) -> list[ProcessorResult]: ...
```

If the list returned is shorter than `entries`, the trailing entries are marked failed.

---

## The Processor Registry

The registry is a module-level dict `_PROCESSOR_REGISTRY: dict[str, type]` in `src/deepzero/engine/registry.py`. Built-ins are registered at import time by `src/deepzero/stages/__init__.py`:

```python
# src/deepzero/stages/__init__.py:1-17
from deepzero.engine.stage import register_processor
from deepzero.stages.command import GenericCommand
# ...
register_processor("file_discovery", FileDiscovery)
register_processor("metadata_filter", MetadataFilter)
register_processor("hash_exclude", HashExclude)
register_processor("generic_llm", GenericLLM)
register_processor("generic_command", GenericCommand)
register_processor("top_k", TopKSelector)
register_processor("sort", Sort)
```

### Resolution order

When the engine sees a `processor:` value in a pipeline YAML, it calls `resolve_processor_class(processor_ref)`. The resolution logic has three distinct branches:

```
processor_ref contains "/" or "\"
    └─▶ _resolve_from_processors_dir()
           Load processors/<dir>/<file>.py
           optional ":ClassName" suffix to pick a specific class

processor_ref contains no "/"  AND  is in the registry
    └─▶ return _PROCESSOR_REGISTRY[processor_ref]
           (e.g. "metadata_filter", "top_k")

processor_ref contains ":" but no "/"
    └─▶ _resolve_from_dotted()
           importlib.import_module(module_path)
           getattr(module, class_name)
           (e.g. "mypackage.processors:MyProcessor")

otherwise → ValueError listing available built-ins
```

Sources: [src/deepzero/engine/registry.py:20-39]()

#### Branch 1 — file path (relative to `processors/`)

```
"ghidra/decompile.py"           → processors/ghidra/decompile.py, first Processor subclass
"ghidra/decompile.py:MyClass"   → processors/ghidra/decompile.py, class MyClass specifically
```

The resolver searches `Path.cwd() / "processors" / <path_part>`, loads the file with `importlib.util.spec_from_file_location`, and scans module attributes for a concrete `Processor` subclass (skipping the abstract base classes themselves). The resolved class gets `_source_file` set so that `processor.processor_dir` points back to the containing folder — useful for co-located assets like rule files or templates.

Sources: [src/deepzero/engine/registry.py:54-86](), [src/deepzero/engine/registry.py:109-137]()

#### Branch 2 — bare name

Bare names such as `"top_k"` or `"metadata_filter"` match the `_PROCESSOR_REGISTRY` dict directly. No filesystem access occurs.

Sources: [src/deepzero/engine/registry.py:29-31]()

#### Branch 3 — dotted import with colon

```
"mypackage.submodule:MyProcessor"
```

Uses `importlib.import_module` for the left side of `:` and `getattr` for the right side. The resulting class must be a `Processor` subclass or a `TypeError` is raised.

Sources: [src/deepzero/engine/registry.py:140-151]()

---

## Data Structures

Understanding the objects passed in and out of `process()` is essential before writing a custom processor.

### `Sample` — output of IngestProcessor

```python
# src/deepzero/engine/stage.py:67-76
@dataclass
class Sample:
    sample_id: str        # sha256[:16] prefix (unique per pipeline run)
    source_path: Path     # path to the original file
    filename: str         # display name
    data: dict[str, Any]  # initial metadata (e.g. sha256, size_bytes)
```

### `ProcessorEntry` — input to Map/Reduce/BulkMap processors

```python
# src/deepzero/engine/stage.py:79-113
@dataclass
class ProcessorEntry:
    sample_id: str
    source_path: Path
    filename: str
    sample_dir: Path | None   # work/<pipeline>/samples/<id>/

    def upstream(self, processor_name: str) -> StageOutput | None: ...
    def upstream_data(self, processor_name: str, key: str, default=None) -> Any: ...
```

`entry.history` is a lazy-loaded dict of `{stage_name: StageOutput}`. It reads from disk only when accessed, keeping memory use proportional to what the processor actually needs.

### `ProcessorResult` — return value for Map/BulkMap processors

| Factory method | Status | Verdict | Meaning |
|---|---|---|---|
| `ProcessorResult.ok(data, artifacts)` | `completed` | `continue` | Success, pass downstream |
| `ProcessorResult.filter(reason, data)` | `completed` | `filter` | Intentionally excluded |
| `ProcessorResult.fail(error)` | `failed` | — | Processing error |

Sources: [src/deepzero/engine/stage.py:139-170]()

### `ProcessorContext` — runtime services

Every `process()` call receives a `ProcessorContext`:

```python
# src/deepzero/engine/stage.py:116-136
@dataclass
class ProcessorContext:
    pipeline_dir: Path
    global_config: GlobalConfig   # settings, knowledge, model
    llm: LLMProtocol | None
    log: logging.Logger
    progress: ProgressReporter
    shutdown_event: threading.Event | None
```

Helpers `ctx.get_setting(key)` and `ctx.get_knowledge(key)` read from the pipeline YAML's `settings:` and `knowledge:` blocks respectively.

### `StageSpec` — the YAML stage definition

```python
# src/deepzero/engine/stage.py:173-188
@dataclass
class StageSpec:
    name: str            # unique instance name in the pipeline
    processor: str       # registry key or file path
    config: dict         # per-stage config from YAML
    parallel: int = 0    # 0 = auto / max hardware
    on_failure: FailurePolicy = FailurePolicy.SKIP
    max_retries: int = 0
    timeout: int = 0     # seconds per sample, 0 = no limit
```

---

## Writing a Custom Processor

### Step 1 — Subclass the right base class

```python
# processors/my_tool/my_tool.py
from dataclasses import dataclass
from pathlib import Path
from deepzero.engine.stage import MapProcessor, ProcessorContext, ProcessorEntry, ProcessorResult

class MyAnalyzer(MapProcessor):
    description = "runs my-tool and extracts a score"
    version = "1.0"

    @dataclass
    class Config:
        threshold: float = 0.5
        output_file: str = "my_result.json"

    def validate(self, ctx: ProcessorContext) -> list[str]:
        # check dependencies before any sample is touched
        import shutil
        if not shutil.which("my-tool"):
            return ["my-tool binary not found on PATH"]
        return []

    def process(self, ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult:
        import subprocess, json
        result = subprocess.run(
            ["my-tool", str(entry.source_path)],
            capture_output=True, text=True
        )
        if result.returncode != 0:
            return ProcessorResult.fail(result.stderr[:200])

        score = float(result.stdout.strip())
        out = entry.sample_dir / self.config.output_file
        out.write_text(json.dumps({"score": score}))

        if score < self.config.threshold:
            return ProcessorResult.filter(f"score {score} < threshold {self.config.threshold}")

        return ProcessorResult.ok(
            data={"score": score},
            artifacts={"result": self.config.output_file},
        )
```

### Step 2 — Declare it in your pipeline YAML

```yaml
stages:
  - name: analyze
    processor: my_tool/my_tool.py   # relative to processors/
    config:
      threshold: 0.7
      output_file: analysis.json
```

For the dotted-import form (when the code lives in an installed package):

```yaml
processor: "mypackage.analyzers:MyAnalyzer"
```

### Key rules for custom processors

| Rule | Why |
|---|---|
| `MapProcessor.process()` must be thread-safe | Engine uses `ThreadPoolExecutor` |
| `ReduceProcessor.process()` returns `list[str]` of IDs to **keep** | Anything not returned is dropped |
| `BulkMapProcessor` returns one result **per index** | Extras are auto-failed |
| `validate()` should surface missing binaries / empty required fields | Runs before any sample is touched |
| Declare `Config` as a `@dataclass` to get typed config fields | Engine instantiates it from the YAML dict |
| Config fields can use `${VAR}` and `${VAR:-default}` syntax | Engine expands env vars before parsing |

Sources: [src/deepzero/engine/stage.py:203-275]()

---

## Built-in Stages

All seven built-ins are registered in `src/deepzero/stages/__init__.py`.

### `file_discovery` — IngestProcessor

Class: `FileDiscovery` (`src/deepzero/stages/ingest.py`)

Discovers files from a directory or accepts a single file. Computes a sha256 hash for each file and uses the first 16 hex characters as `sample_id`. Each `Sample` includes `sha256` and `size_bytes` in its `data` dict.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `extensions` | `list[str]` | `[]` (all) | File extensions to include, e.g. `[".exe", ".dll"]` |
| `recursive` | `bool` | `true` | Whether to recurse into subdirectories |

Sources: [src/deepzero/stages/ingest.py:9-78]()

---

### `metadata_filter` — MapProcessor

Class: `MetadataFilter` (`src/deepzero/stages/filter.py`)

Checks conditions against a flattened view of all upstream `data` dicts. Any failing condition calls `ProcessorResult.filter()`.

| Config key | Type | Meaning |
|---|---|---|
| `require` | `dict` | Field must equal a specific value: `require: {arch: "x86"}` |
| `min_<field>` | numeric | Field must be ≥ this value: `min_size_bytes: 1024` |
| `max_<field>` | numeric | Field must be ≤ this value: `max_size_bytes: 10000000` |
| `dedup_field` | `str` | Filter samples with a previously seen value of this field |

The `dedup_field` state (`self._seen`) is instance-level, so it is thread-local to the processor instance but shared across all calls within a run. `MapProcessor` instances must not mutate shared state in `process()`; `MetadataFilter` is an exception because dedup by design requires it.

Sources: [src/deepzero/stages/filter.py:14-67]()

---

### `hash_exclude` — MapProcessor

Class: `HashExclude` (`src/deepzero/stages/hash_filter.py`)

Maintains a set of known-bad hashes (loaded in `setup()` so the file is read once) and filters any sample whose hash field matches. Can also deduplicate by hash within a run.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `hash_field` | `str` | `"sha256"` | Which data key holds the hash to compare |
| `hashes` | `list[str]` | `[]` | Inline list of hashes to exclude |
| `hash_file` | `str` | `""` | Path to a file of hashes, one per line |
| `dedup` | `bool` | `false` | Also filter duplicate hashes within the run |

Sources: [src/deepzero/stages/hash_filter.py:15-69]()

---

### `top_k` — ReduceProcessor

Class: `TopKSelector` (`src/deepzero/stages/top_k.py`)

Sorts all active samples by a numeric metric from an upstream stage, keeps the top `k`, and discards the rest. `metric_path` uses `"stage_name.data_key"` notation.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `metric_path` | `str` | required | e.g. `"score_stage.score"` |
| `keep_top` | `int` | `10` | Number of samples to keep |
| `sort_order` | `"asc"` \| `"desc"` | `"desc"` | `"desc"` keeps highest values |

Sources: [src/deepzero/stages/top_k.py:8-54]()

---

### `sort` — ReduceProcessor

Class: `Sort` (`src/deepzero/stages/sort.py`)

Re-orders all active samples by an upstream metric without dropping any. Useful when downstream stages or report rendering should see samples in a specific order.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `by` | `str` | required | e.g. `"analysis_stage.complexity_score"` |
| `order` | `"asc"` \| `"desc"` | `"desc"` | Sort direction |

Sources: [src/deepzero/stages/sort.py:8-45]()

---

### `generic_command` — MapProcessor

Class: `GenericCommand` (`src/deepzero/stages/command.py`)

Runs any external command per sample. Template variables like `{sample_path}`, `{sample_dir}`, `{output_dir}`, `{filename}`, and `{sample_id}` are substituted before the command runs. Additional config keys become extra template variables automatically.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `run` | `str` | required | Command template, e.g. `"binwalk -e {sample_path}"` |
| `timeout` | `int` | `300` | Seconds before the process is killed |
| `stdout_to` | `str` | `""` | If set, write stdout to this relative path in `sample_dir` |
| `on_error` | `"fail"` \| `"skip"` | `"fail"` | `"skip"` filters the sample instead of failing it |
| `produces` | `list[str]` | `[]` | Relative paths of files the command creates, exposed as artifacts |

The command is executed asynchronously via `asyncio.create_subprocess_exec`, which avoids shell injection. `stdout_to` writes are atomic (write to `.tmp`, then `os.replace`).

Sources: [src/deepzero/stages/command.py:16-117]()

---

### `generic_llm` — MapProcessor

Class: `GenericLLM` (`src/deepzero/stages/llm.py`)

Sends a Jinja2-rendered prompt to the configured LLM backend (via `litellm`) and writes the response to the sample's working directory. Responses are cached by file: if the output file already exists, the LLM call is skipped.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `prompt` | `str` | required | Path to a `.j2` / `.txt` Jinja2 template |
| `output_file` | `str` | `"assessment.md"` | Filename to write the LLM response into |
| `classify_by` | `str` | `""` | Regex applied to the first 200 chars of response to extract a `classification` field |
| `max_retries` | `int` | `3` | Retry attempts on API errors |
| `backoff` | `dict` | `{initial: 2, max: 60, decay: 0.7}` | Backoff tuning for rate-limit waits |
| `max_context_tokens` | `int` | `900000` | Budget for injecting artifact file content into the template |

The Jinja2 template context includes `sample_name`, `sample_path`, `history` (a nested dict of all upstream stage data), all upstream data keys flattened for backward compatibility, and all artifact files from `sample_dir` (JSON parsed for `.json`, read as text for `.c/.h/.txt/.md/.py/.yaml`).

The LLM backend is the `LLMProvider` class (`src/deepzero/engine/llm.py`), which wraps `litellm.completion()` with adaptive backoff: rate limit errors double the wait up to `max_backoff`; successful calls decay the backoff by `backoff_decay`; context-window errors are not retried. The model string follows litellm's `"provider/model"` convention (e.g. `"openai/gpt-4o"`, `"anthropic/claude-3-5-sonnet-20241022"`) — the provider prefix is the only coupling point, so any litellm-supported backend works without code changes.

Sources: [src/deepzero/stages/llm.py:21-181](), [src/deepzero/engine/llm.py:26-127]()

---

## Summary

```text
Processor type summary
─────────────────────────────────────────────────────────────────────────
Type           Call pattern      Returns              Built-in example
─────────────────────────────────────────────────────────────────────────
IngestProcessor  once per run    list[Sample]         file_discovery
MapProcessor     once per sample ProcessorResult      metadata_filter
                 (parallel)                           hash_exclude
                                                      generic_command
                                                      generic_llm
ReduceProcessor  once, all       list[str] (IDs kept) top_k
                 samples                              sort
BulkMapProcessor once, all       list[ProcessorResult] (custom only)
                 samples         (one per entry)
─────────────────────────────────────────────────────────────────────────
```

The registry resolves a bare name (`"top_k"`) from the in-memory dict, a path string (`"my_tool/tool.py"`) from the `processors/` directory relative to `cwd`, and a dotted string (`"pkg.mod:Class"`) via `importlib`. Custom processors need only subclass the right base class, optionally declare a `Config` dataclass, and be reachable by one of those three forms — no registration call is needed for file-path or dotted-import forms. The full registry API is re-exported from `src/deepzero/engine/stage.py` for import convenience.

Sources: [src/deepzero/engine/registry.py:20-39](), [src/deepzero/stages/__init__.py:1-17]()

---

## 05. A Real Run: The loldrivers Vulnerability Pipeline

> Step-by-step walkthrough of the shipped example pipeline — from PE discovery through IOCTL filtering, loldrivers.io dedup, Ghidra decompilation, Semgrep batch scan, top-10 selection, to LLM assessment with a Jinja2 prompt template. Shows what each external processor does and what data it passes to the next stage.

- Page Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/05-a-real-run-the-loldrivers-vulnerability-pipeline.md
- Generated: 2026-05-22T02:26:46.634Z

### Source Files

- `pipelines/loldrivers/pipeline.yaml`
- `pipelines/loldrivers/assessment.j2`
- `processors/pe_ingest/pe_ingest.py`
- `processors/loldrivers_filter/loldrivers_filter.py`
- `processors/ghidra_decompile/ghidra_decompile.py`
- `processors/semgrep_scanner/semgrep_scanner.py`

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [pipelines/loldrivers/pipeline.yaml](pipelines/loldrivers/pipeline.yaml)
- [pipelines/loldrivers/assessment.j2](pipelines/loldrivers/assessment.j2)
- [processors/pe_ingest/pe_ingest.py](processors/pe_ingest/pe_ingest.py)
- [processors/loldrivers_filter/loldrivers_filter.py](processors/loldrivers_filter/loldrivers_filter.py)
- [processors/ghidra_decompile/ghidra_decompile.py](processors/ghidra_decompile/ghidra_decompile.py)
- [processors/ghidra_decompile/scripts/extract_dispatch.py](processors/ghidra_decompile/scripts/extract_dispatch.py)
- [processors/semgrep_scanner/semgrep_scanner.py](processors/semgrep_scanner/semgrep_scanner.py)
- [pipelines/loldrivers/rules/arbitrary_rw.yaml](pipelines/loldrivers/rules/arbitrary_rw.yaml)
</details>

# A Real Run: The loldrivers Vulnerability Pipeline

The `loldrivers` pipeline is DeepZero's shipping example for Windows kernel driver vulnerability research. It starts with a directory full of raw `.sys` files, narrows the field down to the most interesting candidates using progressively more expensive checks, and ends with an LLM writing a structured vulnerability assessment for the top 10 survivors. This page walks every stage in order — what each processor ingests, what it decides, what it writes to disk, and what it hands to the next stage.

This walkthrough matters because the pipeline demonstrates all four of DeepZero's stage types (`ingest`, `map`, `reduce`, `batch`) working together in sequence. Reading it gives you a concrete mental model of how data flows through any pipeline you build.

---

## Pipeline overview

The pipeline is declared in a single YAML file. Seven stages run in order:

```
discover → kernel_filter → loldrivers_filter → decompile → semgrep_scanner → pick_top_10 → assess
```

```yaml
# pipelines/loldrivers/pipeline.yaml
name: loldrivers
description: windows kernel driver vulnerability research pipeline
version: "2.0"
model: vertex_ai/gemini-2.5-pro
settings:
  work_dir: work
  max_workers: 4
```

The `model` key chooses the LLM for the final `assess` stage. The `work_dir` is where each sample gets a per-sample subdirectory for all artifacts produced along the way.

Sources: [pipelines/loldrivers/pipeline.yaml:1-20]()

---

## Stage 1 — `discover`: PE file ingestion

**Processor:** `pe_ingest/pe_ingest.py` (type: `ingest`)

The very first job is to find `.sys` files and turn each one into a `Sample` — the core data envelope that flows through the rest of the pipeline.

The pipeline configures twelve driver pack subdirectory patterns (`DP_Chipset`, `DP_LAN_Intel`, `DP_WLAN`, etc.). The processor scans only those matching subdirectories rather than the whole target, which keeps the file count manageable for large driver packs.

```python
# processors/pe_ingest/pe_ingest.py:39-59
def _ingest_filtered(self, ctx, root, subdirs, extensions):
    all_dirs = sorted(d for d in root.iterdir() if d.is_dir())
    matching = [d for d in all_dirs if any(p.lower() in d.name.lower() for p in subdirs)]
    ...
    files = sorted(set(files))
    return self._analyze_files(ctx, files)
```

For each file, two things happen in parallel using a `ThreadPoolExecutor`:

1. **I/O worker** (`_io_worker`): reads the raw bytes, computes SHA-256 and MD5.
2. **PE parser** (`_parse_pe`): runs [lief](https://github.com/lief-project/LIEF) on the bytes — still in the main thread to avoid GIL thrashing — and extracts:
   - `subsystem` (e.g. `NATIVE` = kernel driver)
   - `is_kernel_driver`, `machine_type`
   - `imported_dlls` and `imported_functions`
   - `has_ioctl_surface` — true if any of 16 IOCTL-related WDM/WDF/NDIS/StorPort/HID API names are present in the import table
   - `dangerous_imports` — intersection with a list of ~20 high-value APIs: `MmMapIoSpace`, `MmCopyVirtualMemory`, `__readmsr`, `PsLookupProcessByProcessId`, `ZwLoadDriver`, etc.
   - `priority_score` — a float 0–10 computed from dangerous API presence (physical-memory = +3, process-manipulation = +2, MSR/HAL = +2, OS version ≥10 = +3, small section count = +1)

Each `.sys` file becomes one `Sample` whose `data` dict carries all of the above. Only files whose first two bytes are `MZ` are parsed as PE.

Sources: [processors/pe_ingest/pe_ingest.py:76-132](), [processors/pe_ingest/pe_ingest.py:152-289]()

---

## Stage 2 — `kernel_filter`: IOCTL surface gate

**Processor:** `metadata_filter` (built-in, type: `map`)

This stage is a simple declarative filter with two required fields:

```yaml
# pipelines/loldrivers/pipeline.yaml:47-52
processor: metadata_filter
config:
  require:
    is_kernel_driver: true
    has_ioctl_surface: true
  dedup_field: sha256
```

Any sample that is not a `NATIVE`-subsystem binary, or that lacks IOCTL-related imports, is dropped here. The `dedup_field: sha256` means that if two `.sys` files in different subdirectories are byte-for-byte identical (same hash), only one copy advances. This cuts duplicate vendor redistributions before the expensive stages.

Sources: [pipelines/loldrivers/pipeline.yaml:44-52]()

---

## Stage 3 — `loldrivers_filter`: known-vulnerable dedup

**Processor:** `loldrivers_filter/loldrivers_filter.py` (type: `map`)

The idea here is: if a driver is already in the public [loldrivers.io](https://www.loldrivers.io) database, researchers already know about it. Skip it and focus on unknowns.

On first run the processor fetches `https://www.loldrivers.io/api/drivers.json` (HTTPS-only; non-HTTPS URLs are refused). It caches the result for 7 days in the pipeline's cache directory, then reloads from cache on subsequent runs.

```python
# processors/loldrivers_filter/loldrivers_filter.py:94-105
def _load_db(self, path):
    data = json.loads(path.read_text(encoding="utf-8"))
    count = 0
    if isinstance(data, list):
        for entry in data:
            for sample in entry.get("KnownVulnerableSamples", []):
                sha = sample.get("SHA256", "")
                if sha:
                    self._known_hashes.add(sha.lower())
                    count += 1
    self.log.info("loaded %d known hashes from %s", count, path.name)
```

At `process()` time, the sample's SHA-256 (carried from stage 1) is checked against the loaded hash set. If it matches, the sample is filtered with the reason `"already in loldrivers.io database"`.

Sources: [processors/loldrivers_filter/loldrivers_filter.py:19-122]()

---

## Stage 4 — `decompile`: Ghidra headless analysis

**Processor:** `ghidra_decompile/ghidra_decompile.py` (type: `map`)  
**Post-script:** `processors/ghidra_decompile/scripts/extract_dispatch.py`

This is the most expensive stage. The processor spawns Ghidra's `analyzeHeadless` binary as a subprocess for each sample, with a hard 600-second timeout. It is configured with `parallel: 0`, meaning the engine uses the global `max_workers` setting (4) for concurrency.

```yaml
# pipelines/loldrivers/pipeline.yaml:57-68
processor: ghidra_decompile/ghidra_decompile.py
parallel: 0
timeout: 600
config:
  strategy: extract_dispatch.py
  max_functions: 60
  max_depth: 4
  ghidra_install_dir: ${GHIDRA_INSTALL_DIR}
  java_home: ${JAVA_HOME:-}
```

The `${GHIDRA_INSTALL_DIR}` is expanded from the environment at pipeline load time.

### What the post-script does

`extract_dispatch.py` runs inside Ghidra's Jython environment. It follows this sequence:

1. **Locate `DriverEntry`** — by exported symbol name, falling back to the entry-point iterator.
2. **Decompile `DriverEntry`** — write `driver_entry.c` to the sample's `decompiled/` subdirectory.
3. **Extract device name and symbolic link** — scanned from all defined data items; strings matching `\Device\` or `\DosDevices\` are stored in the result.
4. **Find the `IRP_MJ_DEVICE_CONTROL` handler** — searches decompiled C for eight regex patterns matching dispatch table assignment at offset `0xe0` (x64) or `0x70` (x86):
   - `*(ptr + 0xe0) = &FunctionName`
   - `[0x1c] = &FunctionName`
   - `MajorFunction[0xe] = &FunctionName`
   
   It first searches `DriverEntry` and its two levels of callees, then falls back to a full function scan if needed.
5. **Decompile the dispatch handler** — then recursively decompiles all internal subfunctions (up to `max_depth=4`, `max_functions=60`), skipping Windows API prefixes (`Io`, `Mm`, `Ke`, `Zw`, `Wdf`, etc.).
6. **Extract IOCTL codes** — regex-scanned from the dispatch C for `== 0xXXXXXXXX` or `case` values whose upper 16 bits are nonzero (valid device type range).
7. **Write artifacts**:
   - `decompiled/ghidra_result.json` — structured result with `device_name`, `symbolic_link`, `dispatch_name`, `ioctl_handlers`, decompiled C strings
   - `decompiled/dispatch_ioctl.c` — full dispatch + subfunctions concatenated, ready for static analysis
   - `decompiled/ioctls/0xXXXXXXXX.c` — one file per extracted IOCTL code

The result JSON is cached: if it already exists and is valid JSON, the stage is skipped entirely (`should_skip` returns `"decompilation already cached"`).

Sources: [processors/ghidra_decompile/ghidra_decompile.py:52-107](), [processors/ghidra_decompile/scripts/extract_dispatch.py:122-370]()

---

## Stage 5 — `semgrep_scanner`: batch vulnerability scan

**Processor:** `semgrep_scanner/semgrep_scanner.py` (type: `batch`)

Instead of invoking `semgrep` once per driver, this stage collects all active samples' `decompiled/` directories, hard-links (or copies) all `.c`/`.h`/`.cpp` files into a single temporary bulk directory with prefixed filenames, and runs `semgrep scan` once across all of them. This is the key efficiency trick: one semgrep process, one rule-parse cost.

```python
# processors/semgrep_scanner/semgrep_scanner.py:137-148
cmd = [
    "semgrep", "scan",
    "--config", str(rules_path),
    "--json",
    "--no-git-ignore",
    "--quiet",
    "--metrics=off",
    "--disable-version-check",
    str(bulk_dir),
]
```

The rules live in `pipelines/loldrivers/rules/` and cover four vulnerability classes:

| Rule file | Patterns |
|-----------|----------|
| `arbitrary_rw.yaml` | `MmMapIoSpace`, `ZwMapViewOfSection`, `memcpy`/`RtlCopyMemory`, IRP buffer-derived writes, `MmCopyVirtualMemory`, `MmCopyMemory`, `PsLookupProcessByProcessId + KeStackAttachProcess` |
| `buffer_overflow.yaml` | Stack/heap overflow patterns in decompiled C |
| `method_neither.yaml` | `METHOD_NEITHER` IOCTL transfer type (raw user pointer access) |
| `msr_access.yaml` | `__readmsr`/`__writemsr` and HAL bus data access |

After the single semgrep run completes, findings are distributed back to individual samples by matching the `{sample_id}_` prefix on each filename. Each sample gets a `findings.json` written atomically via a temp file + rename. Findings include `rule_id`, `severity` (normalized from `ERROR`→`HIGH`, `WARNING`→`MEDIUM`, `INFO`→`LOW`), `message`, `file`, `line_start`, `line_end`, and `matched_code`.

Samples with fewer than `min_findings: 1` finding are **filtered out** here.

Sources: [processors/semgrep_scanner/semgrep_scanner.py:39-98](), [processors/semgrep_scanner/semgrep_scanner.py:127-254](), [pipelines/loldrivers/rules/arbitrary_rw.yaml:1-103]()

---

## Stage 6 — `pick_top_10`: ranking reduction

**Processor:** `top_k` (built-in, type: `reduce`)

```yaml
# pipelines/loldrivers/pipeline.yaml:78-82
processor: top_k
config:
  metric_path: "semgrep_scanner.finding_count"
  keep_top: 10
  sort_order: desc
```

This is a synchronization barrier: it waits for every active sample to finish `semgrep_scanner`, then sorts by `finding_count` descending and keeps the top 10. The `metric_path` notation `"semgrep_scanner.finding_count"` tells the engine to read the `finding_count` field from the output data that the `semgrep_scanner` stage produced.

After this stage, only 10 samples remain in the active set. All subsequent stages operate on exactly those 10.

Sources: [pipelines/loldrivers/pipeline.yaml:77-82]()

---

## Stage 7 — `assess`: LLM deep analysis

**Processor:** `generic_llm` (built-in, type: `map`)

```yaml
# pipelines/loldrivers/pipeline.yaml:84-95
processor: generic_llm
parallel: 2
on_failure: skip
config:
  prompt: pipelines/loldrivers/assessment.j2
  output_file: assessment.md
  classify_by: "\\[VULNERABLE\\]|\\[SAFE\\]"
  max_context_tokens: 900000
  max_retries: 3
```

Two drivers are assessed in parallel (`parallel: 2`). If the LLM call fails, the sample is skipped rather than halting the run (`on_failure: skip`).

### The Jinja2 prompt template

The template at `pipelines/loldrivers/assessment.j2` provides a carefully structured system prompt plus the per-driver payload. Key elements:

**System rules injected into the prompt** (abridged):

1. **Prove user controllability** — every dangerous value must be traced back to the IOCTL input buffer (`SystemBuffer` at `param_2+0x18`, or `UserBuffer` at `param_2+0x60`). Values from device extension fields, globals, or hardware registers are not user-controlled unless a separate IOCTL sets them.

2. **Reject false positive patterns** — the template explicitly names:
   - Internal driver buffer writes (driver-allocated buffers)
   - HID feature report buffers (normal HID protocol)
   - Hardware-gated code paths (behind MMIO reads or SMI results)
   - PnP-dependent devices (device object only exists when hardware is plugged in)
   - Ghidra decompilation artifacts (`unaff_ESI`, `unaff_EDI` — unresolved registers, not user input)
   - Speculative/assumed state (don't assume convenient initial device extension values)

3. **Exploitability requirements** — all four must hold:
   - Device openable from user mode (`\DosDevices\Name` exists unconditionally)
   - Specific IOCTL code identified
   - Dangerous operation uses IOCTL input buffer data
   - No hardware, firmware, or PnP init required to reach the code path

**Output format enforced by `classify_by`:**

```
[VULNERABLE]   — or —   [SAFE]
```

The `generic_llm` processor regex-matches `classify_by: "\\[VULNERABLE\\]|\\[SAFE\\]"` against the first line of the response to extract the verdict.

**Template variables injected per sample:**

```jinja2
{% if device_name is defined %}Device: {{ device_name }}{% endif %}
{% if symbolic_link is defined %}Symbolic Link: {{ symbolic_link }}{% endif %}
{% if dispatch_handler is defined %}Dispatch Handler: {{ dispatch_handler }}{% endif %}
{% if findings is defined and findings|length > 0 %}
Semgrep Findings ({{ finding_count }}):
{% for f in findings[:20] %}
  - [{{ f.severity }}] {{ f.rule_id }}: {{ f.message[:200] }}
    {{ f.file }}:{{ f.line_start }}
{% endfor %}
{% endif %}

Payload:
{{ dispatch_code }}
```

`dispatch_code` is the full decompiled C from stage 4 — dispatch function plus subfunctions, potentially hundreds of lines. The `max_context_tokens: 900000` budget accommodates large decompilation payloads.

The output is written to `{sample_dir}/assessment.md`.

Sources: [pipelines/loldrivers/assessment.j2:1-66]()

---

## Data flow summary

```text
TARGET DIRECTORY (.sys files)
        │
        ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 1: pe_ingest                                              │
│  • recurse DP_* subdirs, hash bytes, parse PE headers           │
│  • output per sample: sha256, md5, is_kernel_driver,            │
│    has_ioctl_surface, dangerous_imports, priority_score         │
└──────────────────────┬──────────────────────────────────────────┘
                       │ all .sys Samples
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 2: metadata_filter (built-in)                             │
│  • require is_kernel_driver=true AND has_ioctl_surface=true     │
│  • dedup by sha256                                              │
└──────────────────────┬──────────────────────────────────────────┘
                       │ kernel drivers with IOCTL surface
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 3: loldrivers_filter                                      │
│  • fetch/cache loldrivers.io/api/drivers.json (7-day TTL)       │
│  • drop samples whose sha256 is already known                   │
└──────────────────────┬──────────────────────────────────────────┘
                       │ unknown (novel) drivers
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 4: ghidra_decompile (parallel: 4, timeout: 600s)          │
│  • analyzeHeadless + extract_dispatch.py post-script            │
│  • output: decompiled/dispatch_ioctl.c, ghidra_result.json,     │
│    ioctls/0xXXXXXXXX.c per IOCTL code                          │
└──────────────────────┬──────────────────────────────────────────┘
                       │ samples with decompiled C
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 5: semgrep_scanner (batch — one semgrep invocation)       │
│  • bulk-dir hard-links all decompiled .c files                  │
│  • scans with 4 rule files (arbitrary_rw, buffer_overflow,      │
│    method_neither, msr_access)                                  │
│  • output: findings.json per sample; drops samples with 0 hits  │
└──────────────────────┬──────────────────────────────────────────┘
                       │ samples with ≥1 semgrep finding
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 6: top_k (reduce — synchronization barrier)               │
│  • sort descending by semgrep_scanner.finding_count             │
│  • keep top 10                                                  │
└──────────────────────┬──────────────────────────────────────────┘
                       │ 10 best candidates
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 7: generic_llm (parallel: 2, on_failure: skip)            │
│  • render assessment.j2 with device_name, symbolic_link,        │
│    semgrep findings[:20], dispatch_code                         │
│  • classify_by: [VULNERABLE] or [SAFE]                          │
│  • output: assessment.md per sample                             │
└─────────────────────────────────────────────────────────────────┘
```

---

## Key implementation details worth knowing

### Caching at every expensive stage

Both Ghidra and semgrep results are cached to disk. If `decompiled/ghidra_result.json` exists and is valid JSON, Ghidra is skipped. If `findings.json` exists, the semgrep scan is skipped for that sample. This means re-runs after partial failures are cheap.

Sources: [processors/ghidra_decompile/ghidra_decompile.py:52-61](), [processors/semgrep_scanner/semgrep_scanner.py:54-62]()

### Environment variable expansion

`${GHIDRA_INSTALL_DIR}` and `${JAVA_HOME:-}` in the YAML are expanded at pipeline load time. The `:-` form provides an empty-string default, so `JAVA_HOME` is optional.

Sources: [pipelines/loldrivers/pipeline.yaml:66-67]()

### Semgrep bulk-scan is a single process for N samples

The `BulkMapProcessor` base class exposes a `process(ctx, entries: list[...])` signature. `SemgrepScanner.process` receives all active samples at once, assembles one bulk directory, runs one `semgrep` subprocess, then distributes findings back. The filename prefix `{sample_id}_dispatch_ioctl.c` is the routing key.

Sources: [processors/semgrep_scanner/semgrep_scanner.py:100-125]()

### The LLM receives up to 20 semgrep findings, not all

The Jinja2 template caps the findings list at 20 entries (`findings[:20]`) to keep the prompt bounded even when a driver has many hits.

Sources: [pipelines/loldrivers/assessment.j2:57-61]()

---

## Summary

The `loldrivers` pipeline narrows thousands of raw `.sys` files down to the top 10 novel kernel driver candidates through four successive stages of increasing cost: header parsing (cheap, parallel), loldrivers.io hash lookup (network, once), Ghidra headless decompilation (expensive, 4-way parallel, cached), and bulk semgrep scanning (one process for all). The final LLM stage uses a tightly constrained Jinja2 prompt that explicitly enumerates false positive patterns and enforces a `[VULNERABLE]`/`[SAFE]` first-line verdict — making the output machine-parseable for downstream triage. The full pipeline specification lives in `pipelines/loldrivers/pipeline.yaml:1-95`.

---

## 06. The One Idea to Keep & What to Read Next

> Closing recap: the core idea in one sentence, the analogy that holds, the three things that make DeepZero different from a script, and concrete pointers to where to look next in the codebase.

- Page Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/pages/06-the-one-idea-to-keep-what-to-read-next.md
- Generated: 2026-05-22T02:28:22.402Z

### Source Files

- `README.md`
- `src/deepzero/cli.py`
- `src/deepzero/engine/pipeline.py`
- `src/deepzero/api/server.py`

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [README.md](README.md)
- [src/deepzero/cli.py](src/deepzero/cli.py)
- [src/deepzero/engine/pipeline.py](src/deepzero/engine/pipeline.py)
- [src/deepzero/engine/runner.py](src/deepzero/engine/runner.py)
- [src/deepzero/engine/state.py](src/deepzero/engine/state.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
- [src/deepzero/api/server.py](src/deepzero/api/server.py)
- [pipelines/loldrivers/pipeline.yaml](pipelines/loldrivers/pipeline.yaml)
</details>

# The One Idea to Keep & What to Read Next

DeepZero is a YAML-defined pipeline engine that turns a corpus of files into a structured research report by running each file through an ordered chain of processors — filtering, transforming, and LLM-assessing — while persisting every result atomically to disk so any interrupted run can pick up exactly where it left off.

This closing page distills the core concept into one sentence, revisits the analogy that holds best, names the three structural choices that make DeepZero more than a script, and tells you exactly which files to open next in the codebase.

---

## The One Sentence

> **DeepZero is a resumable, parallel, YAML-configured assembly line: each file in your corpus enters at stage 1 and either passes forward, gets filtered out, or is marked failed — and every outcome is written atomically to disk before the next stage begins.**

Everything else in the codebase is engineering that upholds that contract.

---

## The Analogy That Holds

Think of a **factory inspection line**. Raw parts (your binary files) arrive at the start. Each workstation (stage) does one job — remove duplicates, run a decompiler, scan with Semgrep, ask an LLM — and either passes the part forward, rejects it, or marks it defective. At the end of the shift, a ledger records what happened to every part.

The analogy holds in three concrete ways:

| Factory concept | DeepZero equivalent | Where in the code |
|---|---|---|
| Part on the line | `SampleState` with per-stage `history` | `src/deepzero/engine/state.py:56-131` |
| Workstation decision | `Verdict.CONTINUE` or `Verdict.FILTER` | `src/deepzero/engine/state.py:82-97` |
| End-of-shift ledger | `run_manifest.json` written after every stage | `src/deepzero/engine/state.py:244-269` |

The analogy breaks down at one point: a real factory cannot time-travel. DeepZero can. If the power goes out, the ledger is already on disk and the line restarts at the first incomplete workstation.

---

## Three Things That Make DeepZero Different From a Script

### 1. Atomic Per-Sample State (Resumability)

A script runs start-to-finish. If it dies on sample 4,000 of 10,000, you lose everything and restart.

DeepZero writes each sample's result as a `.tmp` → `os.replace()` atomic rename before moving to the next one. When `deepzero run` is called again, `_resume_or_ingest` checks the `samples/` directory first. If sample state files already exist, the expensive ingest step is skipped entirely, and only the samples that did not complete a given stage are fed back into the thread pool.

```python
# src/deepzero/engine/runner.py:275-285
existing_states = self.state_store.list_samples()
if existing_states:
    log.info(
        "resume: found %d existing samples, skipping ingest",
        len(existing_states),
    )
    sample_states: dict[str, SampleState] = {s.sample_id: s for s in existing_states}
```

The atomic write itself lives in `StateStore._atomic_write` — write to `.tmp`, then `os.replace` — with a Windows-specific retry loop to handle Defender scan locks:

```python
# src/deepzero/engine/state.py:32-42
def atomic_replace(src: Path, dst: Path, retries: int = 5) -> None:
    for i in range(retries):
        try:
            os.replace(src, dst)
            return
        except PermissionError:
            if i == retries - 1 or os.name != "nt":
                raise
            time.sleep(0.05)
```

Sources: [src/deepzero/engine/runner.py:275-285](), [src/deepzero/engine/state.py:32-42]()

---

### 2. Typed Processor Contracts (Extensibility Without Chaos)

A script accumulates `if/elif` blocks as requirements grow. DeepZero instead enforces a typed processor taxonomy at load time:

| Type | Shape | Example |
|---|---|---|
| `IngestProcessor` | `process(ctx, target_dir) → list[Sample]` | `pe_ingest.py` — discovers `.sys` files |
| `MapProcessor` | `process(ctx, entry) → ProcessorResult` | `ghidra_decompile.py` — one file, one result |
| `BulkMapProcessor` | `process(ctx, entries) → list[ProcessorResult]` | `semgrep_scanner.py` — batch scan |
| `ReduceProcessor` | `process(ctx, entries) → set[sample_id]` | `top_k` — returns the IDs to keep |

The pipeline loader enforces that stage 0 must be an `IngestProcessor` and all subsequent stages must be one of the other three types. Any violation raises a `ValueError` before execution begins:

```python
# src/deepzero/engine/pipeline.py:159-177
if i == 0:
    if not isinstance(instance, IngestProcessor):
        raise ValueError(
            f"first stage '{spec.name}' ... must be an IngestProcessor."
        )
    pipeline.ingest_processor = instance
else:
    if isinstance(instance, IngestProcessor):
        raise ValueError(...)
    if not isinstance(instance, (MapProcessor, ReduceProcessor, BulkMapProcessor)):
        raise ValueError(...)
    pipeline.stages.append((spec, instance))
```

Custom processors are referenced by a Python file path in the YAML (`processor: ghidra_decompile/ghidra_decompile.py`). The loader resolves them at startup, so errors appear before any work is done.

Sources: [src/deepzero/engine/pipeline.py:154-184]()

---

### 3. Breadth-First Execution With Sync Barriers (Observability and Safety)

A script processes files serially or with ad-hoc threads. DeepZero uses a **breadth-first** model: all surviving samples pass through stage N before any sample enters stage N+1. After every stage, the runner writes the manifest to disk and regenerates per-sample `context.md` files before proceeding.

```python
# src/deepzero/engine/runner.py:167-256 (key excerpt)
# breadth-first stage execution
for spec, processor in self.stages:
    if self._shutdown_event.is_set():
        break
    ...
    # sync barrier
    self.state_store.save_manifest(list(sample_states.values()))
    self._generate_context_files(sample_states)
```

This means:
- You can inspect `work/<pipeline>/run_manifest.json` after any stage to see exactly how many samples passed, were filtered, or failed.
- Ctrl+C is handled gracefully: a `threading.Event` (`_shutdown_event`) is set on the first SIGINT, and the runner saves state before exiting. A second SIGINT forces `os._exit(1)`.
- The REST API (`deepzero serve`) can read the same state files concurrently because they are written atomically.

Sources: [src/deepzero/engine/runner.py:167-256]()

---

## The Structural Flow, Visualized

```text
deepzero run <target> -p pipeline.yaml
        │
        ▼
  load_pipeline()          ← parse YAML, resolve processor classes, validate graph
        │                    src/deepzero/engine/pipeline.py
        ▼
  PipelineRunner.run()     ← install SIGINT handler, create RunState
        │
        ├── _resume_or_ingest()   ← skip ingest if samples/ already on disk
        │        │
        │        ▼ list[SampleState]
        │
        └── breadth-first loop over stages
               │
               ├── MapProcessor    → ThreadPoolExecutor, 1 future per sample
               ├── BulkMapProcessor → one call, list of results
               └── ReduceProcessor → one call, returns kept sample IDs
                       │
                       ▼ (after each stage)
               StateStore.save_manifest()    ← atomic write
               _generate_context_files()     ← per-sample context.md
```

---

## What to Read Next

Here is a concrete reading order, from high-level to low-level:

### Start with the real pipeline definition

[`pipelines/loldrivers/pipeline.yaml`](pipelines/loldrivers/pipeline.yaml) is the only complete working pipeline in the repo. Its comments explain all four processor types, the `${ENV_VAR:-default}` expansion syntax, `parallel:`, `timeout:`, and `on_failure:` in context. Read this before any code.

### Then follow the load path

[`src/deepzero/engine/pipeline.py`](src/deepzero/engine/pipeline.py) — `load_pipeline()` parses the YAML (line 76), expands env vars (line 256), builds `StageSpec` objects (lines 103–135), and calls `_resolve_processors()` (line 154) to validate the stage graph. This is where YAML becomes runnable objects.

### Then the execution heart

[`src/deepzero/engine/runner.py`](src/deepzero/engine/runner.py) — `PipelineRunner._run_all_stages()` (line 156) contains the breadth-first loop, the three dispatch branches (`_run_map`, `_run_reduce`, `_run_batch`), and the sync barrier. `_process_one_map()` (line 529) shows how retries, timeouts, and `should_skip()` work at the per-sample level.

### Then the state model

[`src/deepzero/engine/state.py`](src/deepzero/engine/state.py) — `SampleState` (line 57) and `StageOutput` (line 44) are the two data structures that hold every sample's complete history. `StateStore` (line 163) is a thin file-based persistence layer with no database dependency.

### Then the processor contract

[`src/deepzero/engine/stage.py`](src/deepzero/engine/stage.py) — `MapProcessor`, `ReduceProcessor`, `BulkMapProcessor`, and `IngestProcessor` are the four abstract base classes you subclass to write custom processors. `ProcessorEntry` and `ProcessorContext` are the input and execution-context types passed to every `process()` call.

### Finally, the CLI surface

[`src/deepzero/cli.py`](src/deepzero/cli.py) — `run` (line 133), `status` (line 230), `validate` (line 289), `init` (line 336), and `serve` (line 442) are the five user-facing commands. The `run` command is the definitive wiring example: it loads the pipeline, builds the `PipelineRunner`, detects an existing run (resume) or initializes a fresh `RunState`, then calls `runner.run()`.

---

## Closing Summary

The single idea worth carrying forward is this: **every sample's fate is committed to disk atomically after each stage, so the pipeline is always safe to interrupt and always safe to resume.** The YAML schema, the typed processor hierarchy, and the breadth-first sync barriers all exist in service of that one guarantee. The `StateStore._atomic_write` method in [`src/deepzero/engine/state.py:174-177`](src/deepzero/engine/state.py) — nine lines of `write → os.replace` — is the physical foundation everything else rests on.

Sources: [src/deepzero/engine/state.py:174-177]()

---