# The Workers: Processors & the Registry

> The four processor types (ingest, map, reduce, bulk-map), how the registry resolves a bare name versus a file path versus a dotted import, and how to write a custom processor as a Python class. Covers built-in stages: filter, hash_filter, top_k, sort, command, and llm.

- Repository: 416rehman/DeepZero
- GitHub: https://github.com/416rehman/DeepZero
- Human wiki: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324
- Complete Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/llms-full.txt

## Source Files

- `src/deepzero/engine/registry.py`
- `src/deepzero/engine/stage.py`
- `src/deepzero/stages/ingest.py`
- `src/deepzero/stages/filter.py`
- `src/deepzero/stages/top_k.py`
- `src/deepzero/stages/llm.py`
- `src/deepzero/engine/llm.py`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [src/deepzero/engine/registry.py](src/deepzero/engine/registry.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
- [src/deepzero/engine/llm.py](src/deepzero/engine/llm.py)
- [src/deepzero/stages/__init__.py](src/deepzero/stages/__init__.py)
- [src/deepzero/stages/ingest.py](src/deepzero/stages/ingest.py)
- [src/deepzero/stages/filter.py](src/deepzero/stages/filter.py)
- [src/deepzero/stages/hash_filter.py](src/deepzero/stages/hash_filter.py)
- [src/deepzero/stages/top_k.py](src/deepzero/stages/top_k.py)
- [src/deepzero/stages/sort.py](src/deepzero/stages/sort.py)
- [src/deepzero/stages/command.py](src/deepzero/stages/command.py)
- [src/deepzero/stages/llm.py](src/deepzero/stages/llm.py)
</details>

# The Workers: Processors & the Registry

A DeepZero pipeline is a sequence of **processors** — Python classes that do the actual work on a stream of samples (files, binaries, documents). Each processor belongs to one of four types that describe *how* it relates to the stream: discovering samples, transforming them one-at-a-time, transforming them all at once, or running a global selection pass. The **registry** is the lookup table that maps a name you write in YAML to the class that runs at pipeline time.

This page explains the four processor types, the three ways the registry resolves a processor reference, and how to write and wire up a custom processor. It also documents every built-in stage — `file_discovery`, `metadata_filter`, `hash_exclude`, `top_k`, `sort`, `generic_command`, and `generic_llm`.

---

## The Four Processor Types

Every processor class inherits from one of four abstract base classes declared in `src/deepzero/engine/stage.py`. The type you choose determines what data your `process()` method receives and what it must return.

```text
ProcessorType enum (stage.py:50-54)
  INGEST    → one call, produces the sample list
  MAP       → one call per sample, parallel
  REDUCE    → one call with ALL samples, serial
  BULK_MAP  → one call with ALL samples, returns one result per sample
```

### IngestProcessor — discovering samples

`IngestProcessor` is always the first stage in a pipeline. It runs **once**, receives the target `Path`, and returns a flat list of `Sample` objects. Nothing upstream has run yet; this is where the sample stream begins.

```text
/target/path ──▶ [ IngestProcessor ] ──▶ sample_a, sample_b, sample_c ...
```

```python
# src/deepzero/engine/stage.py:277-288
class IngestProcessor(Processor):
    processor_type = ProcessorType.INGEST

    @abstractmethod
    def process(self, ctx: ProcessorContext, target: Path) -> list[Sample]: ...
```

Use cases: file discovery, binary scanning, API ingestion, manifest loading.

### MapProcessor — per-sample transformation

`MapProcessor` is the workhorse. The engine fans out calls across a `ThreadPoolExecutor` so samples are processed in parallel. Each call gets one `ProcessorEntry` (the sample's identity plus lazy-loaded history from all previous stages) and must return a `ProcessorResult`.

```text
sample_a ──▶ [ MapProcessor ] ──▶ result_a   (ok / filter / fail)
sample_b ──▶ [ MapProcessor ] ──▶ result_b   ← parallel
sample_c ──▶ [ MapProcessor ] ──▶ result_c
```

Because calls are parallel, `process()` must be **thread-safe**: no shared mutable state.

```python
# src/deepzero/engine/stage.py:291-310
class MapProcessor(Processor):
    processor_type = ProcessorType.MAP

    @abstractmethod
    def process(self, ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult: ...

    def should_skip(self, ctx: ProcessorContext, entry: ProcessorEntry) -> str | None:
        # return a reason string to skip already-processed samples (cached output)
        return None
```

Use cases: metadata extraction, LLM analysis, decompilation, format-specific filtering.

### ReduceProcessor — global selection

`ReduceProcessor` receives **all active samples at once**. It is a synchronization barrier: the engine cannot parallelize it or split it across chunks. Return a list of `sample_id` strings to keep; every ID not returned is dropped.

```text
┌ sample_a ┐                      ┌ sample_c ┐
│ sample_b │ ──▶ [ Reduce ] ──▶   │ sample_a │  (reordered, sample_b filtered)
│ sample_c │                      └──────────┘
└──────────┘
```

```python
# src/deepzero/engine/stage.py:313-330
class ReduceProcessor(Processor):
    processor_type = ProcessorType.REDUCE

    @abstractmethod
    def process(self, ctx: ProcessorContext, entries: list[ProcessorEntry]) -> list[str]: ...
```

Use cases: top-k selection, sorting by priority score, global deduplication.

### BulkMapProcessor — batched external invocation

`BulkMapProcessor` receives all active samples in a single call and must return a `ProcessorResult` for each one (matched by index). Unlike `ReduceProcessor`, it does not filter: it produces a result per entry. It is designed for tools with high process-startup cost (e.g., semgrep, bulk static analysis) where spawning one process per sample would be prohibitive.

```python
# src/deepzero/engine/stage.py:333-350
class BulkMapProcessor(Processor):
    processor_type = ProcessorType.BULK_MAP

    @abstractmethod
    def process(
        self, ctx: ProcessorContext, entries: list[ProcessorEntry]
    ) -> list[ProcessorResult]: ...
```

If the list returned is shorter than `entries`, the trailing entries are marked failed.

---

## The Processor Registry

The registry is a module-level dict `_PROCESSOR_REGISTRY: dict[str, type]` in `src/deepzero/engine/registry.py`. Built-ins are registered at import time by `src/deepzero/stages/__init__.py`:

```python
# src/deepzero/stages/__init__.py:1-17
from deepzero.engine.stage import register_processor
from deepzero.stages.command import GenericCommand
# ...
register_processor("file_discovery", FileDiscovery)
register_processor("metadata_filter", MetadataFilter)
register_processor("hash_exclude", HashExclude)
register_processor("generic_llm", GenericLLM)
register_processor("generic_command", GenericCommand)
register_processor("top_k", TopKSelector)
register_processor("sort", Sort)
```

### Resolution order

When the engine sees a `processor:` value in a pipeline YAML, it calls `resolve_processor_class(processor_ref)`. The resolution logic has three distinct branches:

```
processor_ref contains "/" or "\"
    └─▶ _resolve_from_processors_dir()
           Load processors/<dir>/<file>.py
           optional ":ClassName" suffix to pick a specific class

processor_ref contains no "/"  AND  is in the registry
    └─▶ return _PROCESSOR_REGISTRY[processor_ref]
           (e.g. "metadata_filter", "top_k")

processor_ref contains ":" but no "/"
    └─▶ _resolve_from_dotted()
           importlib.import_module(module_path)
           getattr(module, class_name)
           (e.g. "mypackage.processors:MyProcessor")

otherwise → ValueError listing available built-ins
```

Sources: [src/deepzero/engine/registry.py:20-39]()

#### Branch 1 — file path (relative to `processors/`)

```
"ghidra/decompile.py"           → processors/ghidra/decompile.py, first Processor subclass
"ghidra/decompile.py:MyClass"   → processors/ghidra/decompile.py, class MyClass specifically
```

The resolver searches `Path.cwd() / "processors" / <path_part>`, loads the file with `importlib.util.spec_from_file_location`, and scans module attributes for a concrete `Processor` subclass (skipping the abstract base classes themselves). The resolved class gets `_source_file` set so that `processor.processor_dir` points back to the containing folder — useful for co-located assets like rule files or templates.

Sources: [src/deepzero/engine/registry.py:54-86](), [src/deepzero/engine/registry.py:109-137]()

#### Branch 2 — bare name

Bare names such as `"top_k"` or `"metadata_filter"` match the `_PROCESSOR_REGISTRY` dict directly. No filesystem access occurs.

Sources: [src/deepzero/engine/registry.py:29-31]()

#### Branch 3 — dotted import with colon

```
"mypackage.submodule:MyProcessor"
```

Uses `importlib.import_module` for the left side of `:` and `getattr` for the right side. The resulting class must be a `Processor` subclass or a `TypeError` is raised.

Sources: [src/deepzero/engine/registry.py:140-151]()

---

## Data Structures

Understanding the objects passed in and out of `process()` is essential before writing a custom processor.

### `Sample` — output of IngestProcessor

```python
# src/deepzero/engine/stage.py:67-76
@dataclass
class Sample:
    sample_id: str        # sha256[:16] prefix (unique per pipeline run)
    source_path: Path     # path to the original file
    filename: str         # display name
    data: dict[str, Any]  # initial metadata (e.g. sha256, size_bytes)
```

### `ProcessorEntry` — input to Map/Reduce/BulkMap processors

```python
# src/deepzero/engine/stage.py:79-113
@dataclass
class ProcessorEntry:
    sample_id: str
    source_path: Path
    filename: str
    sample_dir: Path | None   # work/<pipeline>/samples/<id>/

    def upstream(self, processor_name: str) -> StageOutput | None: ...
    def upstream_data(self, processor_name: str, key: str, default=None) -> Any: ...
```

`entry.history` is a lazy-loaded dict of `{stage_name: StageOutput}`. It reads from disk only when accessed, keeping memory use proportional to what the processor actually needs.

### `ProcessorResult` — return value for Map/BulkMap processors

| Factory method | Status | Verdict | Meaning |
|---|---|---|---|
| `ProcessorResult.ok(data, artifacts)` | `completed` | `continue` | Success, pass downstream |
| `ProcessorResult.filter(reason, data)` | `completed` | `filter` | Intentionally excluded |
| `ProcessorResult.fail(error)` | `failed` | — | Processing error |

Sources: [src/deepzero/engine/stage.py:139-170]()

### `ProcessorContext` — runtime services

Every `process()` call receives a `ProcessorContext`:

```python
# src/deepzero/engine/stage.py:116-136
@dataclass
class ProcessorContext:
    pipeline_dir: Path
    global_config: GlobalConfig   # settings, knowledge, model
    llm: LLMProtocol | None
    log: logging.Logger
    progress: ProgressReporter
    shutdown_event: threading.Event | None
```

Helpers `ctx.get_setting(key)` and `ctx.get_knowledge(key)` read from the pipeline YAML's `settings:` and `knowledge:` blocks respectively.

### `StageSpec` — the YAML stage definition

```python
# src/deepzero/engine/stage.py:173-188
@dataclass
class StageSpec:
    name: str            # unique instance name in the pipeline
    processor: str       # registry key or file path
    config: dict         # per-stage config from YAML
    parallel: int = 0    # 0 = auto / max hardware
    on_failure: FailurePolicy = FailurePolicy.SKIP
    max_retries: int = 0
    timeout: int = 0     # seconds per sample, 0 = no limit
```

---

## Writing a Custom Processor

### Step 1 — Subclass the right base class

```python
# processors/my_tool/my_tool.py
from dataclasses import dataclass
from pathlib import Path
from deepzero.engine.stage import MapProcessor, ProcessorContext, ProcessorEntry, ProcessorResult

class MyAnalyzer(MapProcessor):
    description = "runs my-tool and extracts a score"
    version = "1.0"

    @dataclass
    class Config:
        threshold: float = 0.5
        output_file: str = "my_result.json"

    def validate(self, ctx: ProcessorContext) -> list[str]:
        # check dependencies before any sample is touched
        import shutil
        if not shutil.which("my-tool"):
            return ["my-tool binary not found on PATH"]
        return []

    def process(self, ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult:
        import subprocess, json
        result = subprocess.run(
            ["my-tool", str(entry.source_path)],
            capture_output=True, text=True
        )
        if result.returncode != 0:
            return ProcessorResult.fail(result.stderr[:200])

        score = float(result.stdout.strip())
        out = entry.sample_dir / self.config.output_file
        out.write_text(json.dumps({"score": score}))

        if score < self.config.threshold:
            return ProcessorResult.filter(f"score {score} < threshold {self.config.threshold}")

        return ProcessorResult.ok(
            data={"score": score},
            artifacts={"result": self.config.output_file},
        )
```

### Step 2 — Declare it in your pipeline YAML

```yaml
stages:
  - name: analyze
    processor: my_tool/my_tool.py   # relative to processors/
    config:
      threshold: 0.7
      output_file: analysis.json
```

For the dotted-import form (when the code lives in an installed package):

```yaml
processor: "mypackage.analyzers:MyAnalyzer"
```

### Key rules for custom processors

| Rule | Why |
|---|---|
| `MapProcessor.process()` must be thread-safe | Engine uses `ThreadPoolExecutor` |
| `ReduceProcessor.process()` returns `list[str]` of IDs to **keep** | Anything not returned is dropped |
| `BulkMapProcessor` returns one result **per index** | Extras are auto-failed |
| `validate()` should surface missing binaries / empty required fields | Runs before any sample is touched |
| Declare `Config` as a `@dataclass` to get typed config fields | Engine instantiates it from the YAML dict |
| Config fields can use `${VAR}` and `${VAR:-default}` syntax | Engine expands env vars before parsing |

Sources: [src/deepzero/engine/stage.py:203-275]()

---

## Built-in Stages

All seven built-ins are registered in `src/deepzero/stages/__init__.py`.

### `file_discovery` — IngestProcessor

Class: `FileDiscovery` (`src/deepzero/stages/ingest.py`)

Discovers files from a directory or accepts a single file. Computes a sha256 hash for each file and uses the first 16 hex characters as `sample_id`. Each `Sample` includes `sha256` and `size_bytes` in its `data` dict.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `extensions` | `list[str]` | `[]` (all) | File extensions to include, e.g. `[".exe", ".dll"]` |
| `recursive` | `bool` | `true` | Whether to recurse into subdirectories |

Sources: [src/deepzero/stages/ingest.py:9-78]()

---

### `metadata_filter` — MapProcessor

Class: `MetadataFilter` (`src/deepzero/stages/filter.py`)

Checks conditions against a flattened view of all upstream `data` dicts. Any failing condition calls `ProcessorResult.filter()`.

| Config key | Type | Meaning |
|---|---|---|
| `require` | `dict` | Field must equal a specific value: `require: {arch: "x86"}` |
| `min_<field>` | numeric | Field must be ≥ this value: `min_size_bytes: 1024` |
| `max_<field>` | numeric | Field must be ≤ this value: `max_size_bytes: 10000000` |
| `dedup_field` | `str` | Filter samples with a previously seen value of this field |

The `dedup_field` state (`self._seen`) is instance-level, so it is thread-local to the processor instance but shared across all calls within a run. `MapProcessor` instances must not mutate shared state in `process()`; `MetadataFilter` is an exception because dedup by design requires it.

Sources: [src/deepzero/stages/filter.py:14-67]()

---

### `hash_exclude` — MapProcessor

Class: `HashExclude` (`src/deepzero/stages/hash_filter.py`)

Maintains a set of known-bad hashes (loaded in `setup()` so the file is read once) and filters any sample whose hash field matches. Can also deduplicate by hash within a run.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `hash_field` | `str` | `"sha256"` | Which data key holds the hash to compare |
| `hashes` | `list[str]` | `[]` | Inline list of hashes to exclude |
| `hash_file` | `str` | `""` | Path to a file of hashes, one per line |
| `dedup` | `bool` | `false` | Also filter duplicate hashes within the run |

Sources: [src/deepzero/stages/hash_filter.py:15-69]()

---

### `top_k` — ReduceProcessor

Class: `TopKSelector` (`src/deepzero/stages/top_k.py`)

Sorts all active samples by a numeric metric from an upstream stage, keeps the top `k`, and discards the rest. `metric_path` uses `"stage_name.data_key"` notation.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `metric_path` | `str` | required | e.g. `"score_stage.score"` |
| `keep_top` | `int` | `10` | Number of samples to keep |
| `sort_order` | `"asc"` \| `"desc"` | `"desc"` | `"desc"` keeps highest values |

Sources: [src/deepzero/stages/top_k.py:8-54]()

---

### `sort` — ReduceProcessor

Class: `Sort` (`src/deepzero/stages/sort.py`)

Re-orders all active samples by an upstream metric without dropping any. Useful when downstream stages or report rendering should see samples in a specific order.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `by` | `str` | required | e.g. `"analysis_stage.complexity_score"` |
| `order` | `"asc"` \| `"desc"` | `"desc"` | Sort direction |

Sources: [src/deepzero/stages/sort.py:8-45]()

---

### `generic_command` — MapProcessor

Class: `GenericCommand` (`src/deepzero/stages/command.py`)

Runs any external command per sample. Template variables like `{sample_path}`, `{sample_dir}`, `{output_dir}`, `{filename}`, and `{sample_id}` are substituted before the command runs. Additional config keys become extra template variables automatically.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `run` | `str` | required | Command template, e.g. `"binwalk -e {sample_path}"` |
| `timeout` | `int` | `300` | Seconds before the process is killed |
| `stdout_to` | `str` | `""` | If set, write stdout to this relative path in `sample_dir` |
| `on_error` | `"fail"` \| `"skip"` | `"fail"` | `"skip"` filters the sample instead of failing it |
| `produces` | `list[str]` | `[]` | Relative paths of files the command creates, exposed as artifacts |

The command is executed asynchronously via `asyncio.create_subprocess_exec`, which avoids shell injection. `stdout_to` writes are atomic (write to `.tmp`, then `os.replace`).

Sources: [src/deepzero/stages/command.py:16-117]()

---

### `generic_llm` — MapProcessor

Class: `GenericLLM` (`src/deepzero/stages/llm.py`)

Sends a Jinja2-rendered prompt to the configured LLM backend (via `litellm`) and writes the response to the sample's working directory. Responses are cached by file: if the output file already exists, the LLM call is skipped.

| Config key | Type | Default | Meaning |
|---|---|---|---|
| `prompt` | `str` | required | Path to a `.j2` / `.txt` Jinja2 template |
| `output_file` | `str` | `"assessment.md"` | Filename to write the LLM response into |
| `classify_by` | `str` | `""` | Regex applied to the first 200 chars of response to extract a `classification` field |
| `max_retries` | `int` | `3` | Retry attempts on API errors |
| `backoff` | `dict` | `{initial: 2, max: 60, decay: 0.7}` | Backoff tuning for rate-limit waits |
| `max_context_tokens` | `int` | `900000` | Budget for injecting artifact file content into the template |

The Jinja2 template context includes `sample_name`, `sample_path`, `history` (a nested dict of all upstream stage data), all upstream data keys flattened for backward compatibility, and all artifact files from `sample_dir` (JSON parsed for `.json`, read as text for `.c/.h/.txt/.md/.py/.yaml`).

The LLM backend is the `LLMProvider` class (`src/deepzero/engine/llm.py`), which wraps `litellm.completion()` with adaptive backoff: rate limit errors double the wait up to `max_backoff`; successful calls decay the backoff by `backoff_decay`; context-window errors are not retried. The model string follows litellm's `"provider/model"` convention (e.g. `"openai/gpt-4o"`, `"anthropic/claude-3-5-sonnet-20241022"`) — the provider prefix is the only coupling point, so any litellm-supported backend works without code changes.

Sources: [src/deepzero/stages/llm.py:21-181](), [src/deepzero/engine/llm.py:26-127]()

---

## Summary

```text
Processor type summary
─────────────────────────────────────────────────────────────────────────
Type           Call pattern      Returns              Built-in example
─────────────────────────────────────────────────────────────────────────
IngestProcessor  once per run    list[Sample]         file_discovery
MapProcessor     once per sample ProcessorResult      metadata_filter
                 (parallel)                           hash_exclude
                                                      generic_command
                                                      generic_llm
ReduceProcessor  once, all       list[str] (IDs kept) top_k
                 samples                              sort
BulkMapProcessor once, all       list[ProcessorResult] (custom only)
                 samples         (one per entry)
─────────────────────────────────────────────────────────────────────────
```

The registry resolves a bare name (`"top_k"`) from the in-memory dict, a path string (`"my_tool/tool.py"`) from the `processors/` directory relative to `cwd`, and a dotted string (`"pkg.mod:Class"`) via `importlib`. Custom processors need only subclass the right base class, optionally declare a `Config` dataclass, and be reachable by one of those three forms — no registration call is needed for file-path or dotted-import forms. The full registry API is re-exported from `src/deepzero/engine/stage.py` for import convenience.

Sources: [src/deepzero/engine/registry.py:20-39](), [src/deepzero/stages/__init__.py:1-17]()
