# The Recipe Card: Pipeline YAML

> How a pipeline.yaml file works — what each field means, how stages chain together, how environment variables are expanded, and the hard rule that the first stage must always be an ingest processor.

- Repository: 416rehman/DeepZero
- GitHub: https://github.com/416rehman/DeepZero
- Human wiki: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324
- Complete Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/llms-full.txt

## Source Files

- `pipelines/loldrivers/pipeline.yaml`
- `src/deepzero/engine/pipeline.py`
- `src/deepzero/engine/stage.py`
- `src/deepzero/engine/types.py`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [pipelines/loldrivers/pipeline.yaml](pipelines/loldrivers/pipeline.yaml)
- [src/deepzero/engine/pipeline.py](src/deepzero/engine/pipeline.py)
- [src/deepzero/engine/stage.py](src/deepzero/engine/stage.py)
- [src/deepzero/engine/types.py](src/deepzero/engine/types.py)
- [src/deepzero/engine/registry.py](src/deepzero/engine/registry.py)
- [processors/pe_ingest/pe_ingest.py](processors/pe_ingest/pe_ingest.py)
</details>

# The Recipe Card: Pipeline YAML

A `pipeline.yaml` file is the single source of truth for everything DeepZero needs to run a data-analysis pipeline: what to call it, which LLM to use, how much parallelism to allow, and — most importantly — the ordered list of processing stages that each sample travels through. Think of it as a recipe card: the ingredients are your files or data samples, and each stage is a cooking step that filters, transforms, or ranks those samples before the next step sees them.

This page explains every field in the schema, how stages chain together into a directed flow, how `${ENV_VAR}` placeholders are expanded at load time, and the hard architectural rule that the pipeline's first stage must always be an **ingest** processor.

---

## Top-Level Fields

A `pipeline.yaml` is a YAML mapping. The engine reads it with `yaml.safe_load` and validates the structure before a single sample is ever touched.

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `name` | string | no | Human-readable pipeline identifier. Defaults to the directory name. Becomes the subdirectory under `work_dir`. |
| `description` | string | no | Free-text description shown in listings. |
| `version` | string | no | Informational version tag. |
| `model` | string | no | LLM identifier passed to every stage that calls an LLM (e.g. `vertex_ai/gemini-2.5-pro`). Can be overridden on the CLI. |
| `settings` | mapping | no | Pipeline-wide settings. See below. |
| `knowledge` | mapping | no | Arbitrary key-value data available to every processor via `ctx.get_knowledge()`. |
| `stages` | list | **yes** | Ordered list of stage definitions. Must contain at least one entry. |

Sources: [src/deepzero/engine/pipeline.py:90-99]()

### The `settings` block

```yaml
settings:
  work_dir: work        # base directory for output (default: "work")
  max_workers: 4        # thread-pool ceiling (default: min(4, cpu_count))
```

`work_dir` is always resolved as `<work_dir>/<pipeline_name>/`, so each pipeline gets its own workspace. Relative paths are anchored to `cwd`.

Sources: [src/deepzero/engine/pipeline.py:53-62]()

---

## Stage Fields

Every entry under `stages:` is a mapping with these keys:

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `name` | string | no | `stage_N` | Unique label for this stage. Used for logging, state storage keys, and `entry.upstream("name")` lookups. Duplicate names raise a `ValueError`. |
| `processor` | string | **yes** | — | Which processor class to run. See [Processor Resolution](#processor-resolution) below. |
| `config` | mapping | no | `{}` | Passed verbatim to the processor's typed `Config` dataclass (or as a raw dict if no `Config` is declared). |
| `parallel` | int | no | `4` | Number of concurrent threads for `MapProcessor` stages. `0` means "use the hardware maximum". |
| `on_failure` | string | no | `skip` | What to do when a sample fails: `skip` (mark failed, continue others), `retry` (re-attempt up to `max_retries`), or `abort` (halt the entire pipeline). |
| `max_retries` | int | no | `0` | Number of retries when `on_failure: retry`. |
| `timeout` | int | no | `0` | Per-sample timeout in seconds. `0` means no timeout. |

Sources: [src/deepzero/engine/stage.py:173-188](), [src/deepzero/engine/pipeline.py:118-134]()

### Stage example from the loldrivers pipeline

```yaml
# pipelines/loldrivers/pipeline.yaml (lines 56-68)
- name: decompile
  processor: ghidra_decompile/ghidra_decompile.py
  parallel: 0           # use every available core
  timeout: 600          # 10 minutes per sample
  config:
    strategy: extract_dispatch.py
    max_functions: 60
    max_depth: 4
    ghidra_install_dir: ${GHIDRA_INSTALL_DIR}
    java_home: ${JAVA_HOME:-}
```

---

## Processor Resolution

The `processor` field can take three forms. The engine's `resolve_processor_class()` function tries them in order:

```
bare name              → built-in registry  (e.g. "metadata_filter", "top_k", "generic_llm")
dir/file.py            → processors/<dir>/<file>.py, first Processor subclass found
dir/file.py:ClassName  → processors/<dir>/<file>.py, specific named class
```

External processors (the path forms) are loaded from the `processors/` directory relative to `cwd`. The file is imported via `importlib`, and `_source_file` is set so the processor can locate its own co-located assets.

Sources: [src/deepzero/engine/registry.py:20-39](), [src/deepzero/engine/registry.py:54-85]()

```text
Processor reference forms
─────────────────────────────────────────────────────────────
"metadata_filter"                 → built-in registry lookup
"pe_ingest/pe_ingest.py"          → processors/pe_ingest/pe_ingest.py
                                     (first Processor subclass)
"pe_ingest/pe_ingest.py:PEIngest" → processors/pe_ingest/pe_ingest.py
                                     (class named PEIngest)
```

---

## How Stages Chain Together

The pipeline is a strictly linear sequence. Each sample is an independent unit of work that flows through the stages one by one. The four processor types define how each stage interacts with the sample stream:

```
target path
    │
    ▼
┌─────────────────────────────────────┐
│  IngestProcessor  (stage 0 only)    │  discovers samples from a source
│  e.g. pe_ingest                     │  returns list[Sample]
└──────────────────┬──────────────────┘
                   │  sample_a, sample_b, sample_c …
    ┌──────────────┼──────────────────────────────┐
    ▼              ▼                              ▼
┌──────────┐ ┌──────────┐                 ┌──────────┐
│MapProc.  │ │MapProc.  │   (parallel)    │MapProc.  │  1:1 per sample
│ (filter) │ │ (llm)    │                 │ (decomp) │  ok / filter / fail
└────┬─────┘ └────┬─────┘                 └────┬─────┘
     │             │                            │
     └─────────────┴────────────────────────────┘
                   │  surviving samples
                   ▼
       ┌───────────────────────┐
       │  ReduceProcessor      │  sees ALL samples at once
       │  e.g. top_k           │  returns list[sample_id] to keep
       └───────────┬───────────┘
                   │  top N samples
                   ▼
       ┌───────────────────────┐
       │  BulkMapProcessor     │  one external invocation for all samples
       │  e.g. semgrep_scanner │  returns list[ProcessorResult]
       └───────────────────────┘
```

**Key rules enforced at load time:**

1. **Stage 0 must be `IngestProcessor`** — the engine raises a `ValueError` if the first processor is not an `IngestProcessor`.
2. **`IngestProcessor` may only appear at stage 0** — placing one at any other position raises a `ValueError`.
3. Stages 1+ must be `MapProcessor`, `ReduceProcessor`, or `BulkMapProcessor`.

Sources: [src/deepzero/engine/pipeline.py:159-177]()

---

## The Hard Rule: First Stage Must Be Ingest

This is not a style convention — it is enforced by `_resolve_processors()` in the engine:

```python
# src/deepzero/engine/pipeline.py:159-164
if i == 0:
    if not isinstance(instance, IngestProcessor):
        raise ValueError(
            f"first stage '{spec.name}' (processor='{spec.processor}') must be an IngestProcessor. "
            f"got {cls.__name__}. every pipeline must start with an ingest processor."
        )
```

The reason is architectural: `IngestProcessor.process()` receives a `target: Path` and returns a `list[Sample]`. Every other processor type receives individual `ProcessorEntry` objects, which only exist *after* the ingest stage creates them. There is no sample stream without an ingest stage to seed it.

Sources: [src/deepzero/engine/stage.py:277-288](), [src/deepzero/engine/pipeline.py:154-165]()

---

## Environment Variable Expansion

All string values in the YAML — including nested `config` fields — are expanded before any processor sees them. Expansion happens immediately after `yaml.safe_load()`, before validation.

**Two supported forms:**

| Syntax | Behavior |
|--------|----------|
| `${VAR}` | Replaced by the environment variable value. If `VAR` is unset, the literal `${VAR}` is kept unchanged. |
| `${VAR:-default}` | Replaced by the environment variable value, or `default` if unset. |

```python
# src/deepzero/engine/pipeline.py:269-275
def _expand_string(s: str) -> str:
    def _replace(match: re.Match) -> str:
        var = match.group(1)
        if ":-" in var:
            name, default = var.split(":-", 1)
            return os.environ.get(name, default)
        return os.environ.get(var, match.group(0))   # keeps original if unset
    return re.sub(r"\$\{([^}]+)\}", _replace, s)
```

In the loldrivers pipeline, the Ghidra stage uses both forms:

```yaml
# pipelines/loldrivers/pipeline.yaml:66-67
ghidra_install_dir: ${GHIDRA_INSTALL_DIR}   # required — kept literal if unset
java_home: ${JAVA_HOME:-}                   # optional — empty string if unset
```

The processor's own `validate()` method should check that required fields are non-empty after expansion. If `GHIDRA_INSTALL_DIR` is not set in the environment, the decompile stage will fail at validation time with a clear error rather than mid-run.

Sources: [src/deepzero/engine/pipeline.py:256-276]()

---

## Pipeline Load and Validation Sequence

When `load_pipeline(ref)` is called, six steps happen in order before any sample is touched:

1. **Path resolution** — `ref` is resolved against `cwd/pipelines/<ref>`, `~/.deepzero/pipelines/<ref>`, and the installed package's `pipelines/` directory.
2. **YAML parse** — `yaml.safe_load()` reads the file.
3. **Env-var expansion** — `_expand_env_vars()` recursively walks every string in the parsed dict.
4. **StageSpec construction** — each `stages:` entry becomes a `StageSpec` dataclass; duplicate names and invalid `on_failure` values are rejected.
5. **Processor resolution** — `_resolve_processors()` instantiates every processor class and enforces the ingest-first rule.
6. **Processor-level validation** — each processor's `validate(ctx)` hook runs; errors are collected and raised together.

Sources: [src/deepzero/engine/pipeline.py:76-151]()

---

## Complete Field Reference

```yaml
name: <string>            # pipeline identifier (default: directory name)
description: <string>     # human-readable description
version: <string>         # informational version tag

model: <string>           # LLM model identifier for generic_llm stages

settings:
  work_dir: <path>        # output root (default: "work")
  max_workers: <int>      # thread-pool ceiling (default: min(4, cpu_count))

knowledge:                # arbitrary key-value data for processors
  <key>: <value>

stages:
  - name: <string>        # unique stage label
    processor: <ref>      # bare name | dir/file.py | dir/file.py:Class
    config:               # processor-specific settings; ${VAR} expanded
      <key>: <value>
    parallel: <int>       # concurrency level (default: 4, 0 = max)
    on_failure: skip      # skip | retry | abort (default: skip)
    max_retries: <int>    # retries when on_failure=retry (default: 0)
    timeout: <int>        # per-sample seconds, 0 = none (default: 0)
```

---

## Summary

A `pipeline.yaml` is a compact declaration that the DeepZero engine turns into an executable, stateful data-processing workflow. The file is parsed and fully validated — including environment variable expansion and per-processor health checks — before any sample is ever touched. The strictest invariant in the whole system is that **position 0 must always be an `IngestProcessor`**: it seeds the sample stream that every subsequent `map`, `reduce`, and `bulk_map` stage depends on. This is enforced in code, not convention, at `src/deepzero/engine/pipeline.py:159-164`.
