# A Real Run: The loldrivers Vulnerability Pipeline

> Step-by-step walkthrough of the shipped example pipeline — from PE discovery through IOCTL filtering, loldrivers.io dedup, Ghidra decompilation, Semgrep batch scan, top-10 selection, to LLM assessment with a Jinja2 prompt template. Shows what each external processor does and what data it passes to the next stage.

- Repository: 416rehman/DeepZero
- GitHub: https://github.com/416rehman/DeepZero
- Human wiki: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324
- Complete Markdown: https://grok-wiki.com/public/wiki/416rehman-deepzero-841693239324/llms-full.txt

## Source Files

- `pipelines/loldrivers/pipeline.yaml`
- `pipelines/loldrivers/assessment.j2`
- `processors/pe_ingest/pe_ingest.py`
- `processors/loldrivers_filter/loldrivers_filter.py`
- `processors/ghidra_decompile/ghidra_decompile.py`
- `processors/semgrep_scanner/semgrep_scanner.py`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [pipelines/loldrivers/pipeline.yaml](pipelines/loldrivers/pipeline.yaml)
- [pipelines/loldrivers/assessment.j2](pipelines/loldrivers/assessment.j2)
- [processors/pe_ingest/pe_ingest.py](processors/pe_ingest/pe_ingest.py)
- [processors/loldrivers_filter/loldrivers_filter.py](processors/loldrivers_filter/loldrivers_filter.py)
- [processors/ghidra_decompile/ghidra_decompile.py](processors/ghidra_decompile/ghidra_decompile.py)
- [processors/ghidra_decompile/scripts/extract_dispatch.py](processors/ghidra_decompile/scripts/extract_dispatch.py)
- [processors/semgrep_scanner/semgrep_scanner.py](processors/semgrep_scanner/semgrep_scanner.py)
- [pipelines/loldrivers/rules/arbitrary_rw.yaml](pipelines/loldrivers/rules/arbitrary_rw.yaml)
</details>

# A Real Run: The loldrivers Vulnerability Pipeline

The `loldrivers` pipeline is DeepZero's shipping example for Windows kernel driver vulnerability research. It starts with a directory full of raw `.sys` files, narrows the field down to the most interesting candidates using progressively more expensive checks, and ends with an LLM writing a structured vulnerability assessment for the top 10 survivors. This page walks every stage in order — what each processor ingests, what it decides, what it writes to disk, and what it hands to the next stage.

This walkthrough matters because the pipeline demonstrates all four of DeepZero's stage types (`ingest`, `map`, `reduce`, `batch`) working together in sequence. Reading it gives you a concrete mental model of how data flows through any pipeline you build.

---

## Pipeline overview

The pipeline is declared in a single YAML file. Seven stages run in order:

```
discover → kernel_filter → loldrivers_filter → decompile → semgrep_scanner → pick_top_10 → assess
```

```yaml
# pipelines/loldrivers/pipeline.yaml
name: loldrivers
description: windows kernel driver vulnerability research pipeline
version: "2.0"
model: vertex_ai/gemini-2.5-pro
settings:
  work_dir: work
  max_workers: 4
```

The `model` key chooses the LLM for the final `assess` stage. The `work_dir` is where each sample gets a per-sample subdirectory for all artifacts produced along the way.

Sources: [pipelines/loldrivers/pipeline.yaml:1-20]()

---

## Stage 1 — `discover`: PE file ingestion

**Processor:** `pe_ingest/pe_ingest.py` (type: `ingest`)

The very first job is to find `.sys` files and turn each one into a `Sample` — the core data envelope that flows through the rest of the pipeline.

The pipeline configures twelve driver pack subdirectory patterns (`DP_Chipset`, `DP_LAN_Intel`, `DP_WLAN`, etc.). The processor scans only those matching subdirectories rather than the whole target, which keeps the file count manageable for large driver packs.

```python
# processors/pe_ingest/pe_ingest.py:39-59
def _ingest_filtered(self, ctx, root, subdirs, extensions):
    all_dirs = sorted(d for d in root.iterdir() if d.is_dir())
    matching = [d for d in all_dirs if any(p.lower() in d.name.lower() for p in subdirs)]
    ...
    files = sorted(set(files))
    return self._analyze_files(ctx, files)
```

For each file, two things happen in parallel using a `ThreadPoolExecutor`:

1. **I/O worker** (`_io_worker`): reads the raw bytes, computes SHA-256 and MD5.
2. **PE parser** (`_parse_pe`): runs [lief](https://github.com/lief-project/LIEF) on the bytes — still in the main thread to avoid GIL thrashing — and extracts:
   - `subsystem` (e.g. `NATIVE` = kernel driver)
   - `is_kernel_driver`, `machine_type`
   - `imported_dlls` and `imported_functions`
   - `has_ioctl_surface` — true if any of 16 IOCTL-related WDM/WDF/NDIS/StorPort/HID API names are present in the import table
   - `dangerous_imports` — intersection with a list of ~20 high-value APIs: `MmMapIoSpace`, `MmCopyVirtualMemory`, `__readmsr`, `PsLookupProcessByProcessId`, `ZwLoadDriver`, etc.
   - `priority_score` — a float 0–10 computed from dangerous API presence (physical-memory = +3, process-manipulation = +2, MSR/HAL = +2, OS version ≥10 = +3, small section count = +1)

Each `.sys` file becomes one `Sample` whose `data` dict carries all of the above. Only files whose first two bytes are `MZ` are parsed as PE.

Sources: [processors/pe_ingest/pe_ingest.py:76-132](), [processors/pe_ingest/pe_ingest.py:152-289]()

---

## Stage 2 — `kernel_filter`: IOCTL surface gate

**Processor:** `metadata_filter` (built-in, type: `map`)

This stage is a simple declarative filter with two required fields:

```yaml
# pipelines/loldrivers/pipeline.yaml:47-52
processor: metadata_filter
config:
  require:
    is_kernel_driver: true
    has_ioctl_surface: true
  dedup_field: sha256
```

Any sample that is not a `NATIVE`-subsystem binary, or that lacks IOCTL-related imports, is dropped here. The `dedup_field: sha256` means that if two `.sys` files in different subdirectories are byte-for-byte identical (same hash), only one copy advances. This cuts duplicate vendor redistributions before the expensive stages.

Sources: [pipelines/loldrivers/pipeline.yaml:44-52]()

---

## Stage 3 — `loldrivers_filter`: known-vulnerable dedup

**Processor:** `loldrivers_filter/loldrivers_filter.py` (type: `map`)

The idea here is: if a driver is already in the public [loldrivers.io](https://www.loldrivers.io) database, researchers already know about it. Skip it and focus on unknowns.

On first run the processor fetches `https://www.loldrivers.io/api/drivers.json` (HTTPS-only; non-HTTPS URLs are refused). It caches the result for 7 days in the pipeline's cache directory, then reloads from cache on subsequent runs.

```python
# processors/loldrivers_filter/loldrivers_filter.py:94-105
def _load_db(self, path):
    data = json.loads(path.read_text(encoding="utf-8"))
    count = 0
    if isinstance(data, list):
        for entry in data:
            for sample in entry.get("KnownVulnerableSamples", []):
                sha = sample.get("SHA256", "")
                if sha:
                    self._known_hashes.add(sha.lower())
                    count += 1
    self.log.info("loaded %d known hashes from %s", count, path.name)
```

At `process()` time, the sample's SHA-256 (carried from stage 1) is checked against the loaded hash set. If it matches, the sample is filtered with the reason `"already in loldrivers.io database"`.

Sources: [processors/loldrivers_filter/loldrivers_filter.py:19-122]()

---

## Stage 4 — `decompile`: Ghidra headless analysis

**Processor:** `ghidra_decompile/ghidra_decompile.py` (type: `map`)  
**Post-script:** `processors/ghidra_decompile/scripts/extract_dispatch.py`

This is the most expensive stage. The processor spawns Ghidra's `analyzeHeadless` binary as a subprocess for each sample, with a hard 600-second timeout. It is configured with `parallel: 0`, meaning the engine uses the global `max_workers` setting (4) for concurrency.

```yaml
# pipelines/loldrivers/pipeline.yaml:57-68
processor: ghidra_decompile/ghidra_decompile.py
parallel: 0
timeout: 600
config:
  strategy: extract_dispatch.py
  max_functions: 60
  max_depth: 4
  ghidra_install_dir: ${GHIDRA_INSTALL_DIR}
  java_home: ${JAVA_HOME:-}
```

The `${GHIDRA_INSTALL_DIR}` is expanded from the environment at pipeline load time.

### What the post-script does

`extract_dispatch.py` runs inside Ghidra's Jython environment. It follows this sequence:

1. **Locate `DriverEntry`** — by exported symbol name, falling back to the entry-point iterator.
2. **Decompile `DriverEntry`** — write `driver_entry.c` to the sample's `decompiled/` subdirectory.
3. **Extract device name and symbolic link** — scanned from all defined data items; strings matching `\Device\` or `\DosDevices\` are stored in the result.
4. **Find the `IRP_MJ_DEVICE_CONTROL` handler** — searches decompiled C for eight regex patterns matching dispatch table assignment at offset `0xe0` (x64) or `0x70` (x86):
   - `*(ptr + 0xe0) = &FunctionName`
   - `[0x1c] = &FunctionName`
   - `MajorFunction[0xe] = &FunctionName`
   
   It first searches `DriverEntry` and its two levels of callees, then falls back to a full function scan if needed.
5. **Decompile the dispatch handler** — then recursively decompiles all internal subfunctions (up to `max_depth=4`, `max_functions=60`), skipping Windows API prefixes (`Io`, `Mm`, `Ke`, `Zw`, `Wdf`, etc.).
6. **Extract IOCTL codes** — regex-scanned from the dispatch C for `== 0xXXXXXXXX` or `case` values whose upper 16 bits are nonzero (valid device type range).
7. **Write artifacts**:
   - `decompiled/ghidra_result.json` — structured result with `device_name`, `symbolic_link`, `dispatch_name`, `ioctl_handlers`, decompiled C strings
   - `decompiled/dispatch_ioctl.c` — full dispatch + subfunctions concatenated, ready for static analysis
   - `decompiled/ioctls/0xXXXXXXXX.c` — one file per extracted IOCTL code

The result JSON is cached: if it already exists and is valid JSON, the stage is skipped entirely (`should_skip` returns `"decompilation already cached"`).

Sources: [processors/ghidra_decompile/ghidra_decompile.py:52-107](), [processors/ghidra_decompile/scripts/extract_dispatch.py:122-370]()

---

## Stage 5 — `semgrep_scanner`: batch vulnerability scan

**Processor:** `semgrep_scanner/semgrep_scanner.py` (type: `batch`)

Instead of invoking `semgrep` once per driver, this stage collects all active samples' `decompiled/` directories, hard-links (or copies) all `.c`/`.h`/`.cpp` files into a single temporary bulk directory with prefixed filenames, and runs `semgrep scan` once across all of them. This is the key efficiency trick: one semgrep process, one rule-parse cost.

```python
# processors/semgrep_scanner/semgrep_scanner.py:137-148
cmd = [
    "semgrep", "scan",
    "--config", str(rules_path),
    "--json",
    "--no-git-ignore",
    "--quiet",
    "--metrics=off",
    "--disable-version-check",
    str(bulk_dir),
]
```

The rules live in `pipelines/loldrivers/rules/` and cover four vulnerability classes:

| Rule file | Patterns |
|-----------|----------|
| `arbitrary_rw.yaml` | `MmMapIoSpace`, `ZwMapViewOfSection`, `memcpy`/`RtlCopyMemory`, IRP buffer-derived writes, `MmCopyVirtualMemory`, `MmCopyMemory`, `PsLookupProcessByProcessId + KeStackAttachProcess` |
| `buffer_overflow.yaml` | Stack/heap overflow patterns in decompiled C |
| `method_neither.yaml` | `METHOD_NEITHER` IOCTL transfer type (raw user pointer access) |
| `msr_access.yaml` | `__readmsr`/`__writemsr` and HAL bus data access |

After the single semgrep run completes, findings are distributed back to individual samples by matching the `{sample_id}_` prefix on each filename. Each sample gets a `findings.json` written atomically via a temp file + rename. Findings include `rule_id`, `severity` (normalized from `ERROR`→`HIGH`, `WARNING`→`MEDIUM`, `INFO`→`LOW`), `message`, `file`, `line_start`, `line_end`, and `matched_code`.

Samples with fewer than `min_findings: 1` finding are **filtered out** here.

Sources: [processors/semgrep_scanner/semgrep_scanner.py:39-98](), [processors/semgrep_scanner/semgrep_scanner.py:127-254](), [pipelines/loldrivers/rules/arbitrary_rw.yaml:1-103]()

---

## Stage 6 — `pick_top_10`: ranking reduction

**Processor:** `top_k` (built-in, type: `reduce`)

```yaml
# pipelines/loldrivers/pipeline.yaml:78-82
processor: top_k
config:
  metric_path: "semgrep_scanner.finding_count"
  keep_top: 10
  sort_order: desc
```

This is a synchronization barrier: it waits for every active sample to finish `semgrep_scanner`, then sorts by `finding_count` descending and keeps the top 10. The `metric_path` notation `"semgrep_scanner.finding_count"` tells the engine to read the `finding_count` field from the output data that the `semgrep_scanner` stage produced.

After this stage, only 10 samples remain in the active set. All subsequent stages operate on exactly those 10.

Sources: [pipelines/loldrivers/pipeline.yaml:77-82]()

---

## Stage 7 — `assess`: LLM deep analysis

**Processor:** `generic_llm` (built-in, type: `map`)

```yaml
# pipelines/loldrivers/pipeline.yaml:84-95
processor: generic_llm
parallel: 2
on_failure: skip
config:
  prompt: pipelines/loldrivers/assessment.j2
  output_file: assessment.md
  classify_by: "\\[VULNERABLE\\]|\\[SAFE\\]"
  max_context_tokens: 900000
  max_retries: 3
```

Two drivers are assessed in parallel (`parallel: 2`). If the LLM call fails, the sample is skipped rather than halting the run (`on_failure: skip`).

### The Jinja2 prompt template

The template at `pipelines/loldrivers/assessment.j2` provides a carefully structured system prompt plus the per-driver payload. Key elements:

**System rules injected into the prompt** (abridged):

1. **Prove user controllability** — every dangerous value must be traced back to the IOCTL input buffer (`SystemBuffer` at `param_2+0x18`, or `UserBuffer` at `param_2+0x60`). Values from device extension fields, globals, or hardware registers are not user-controlled unless a separate IOCTL sets them.

2. **Reject false positive patterns** — the template explicitly names:
   - Internal driver buffer writes (driver-allocated buffers)
   - HID feature report buffers (normal HID protocol)
   - Hardware-gated code paths (behind MMIO reads or SMI results)
   - PnP-dependent devices (device object only exists when hardware is plugged in)
   - Ghidra decompilation artifacts (`unaff_ESI`, `unaff_EDI` — unresolved registers, not user input)
   - Speculative/assumed state (don't assume convenient initial device extension values)

3. **Exploitability requirements** — all four must hold:
   - Device openable from user mode (`\DosDevices\Name` exists unconditionally)
   - Specific IOCTL code identified
   - Dangerous operation uses IOCTL input buffer data
   - No hardware, firmware, or PnP init required to reach the code path

**Output format enforced by `classify_by`:**

```
[VULNERABLE]   — or —   [SAFE]
```

The `generic_llm` processor regex-matches `classify_by: "\\[VULNERABLE\\]|\\[SAFE\\]"` against the first line of the response to extract the verdict.

**Template variables injected per sample:**

```jinja2
{% if device_name is defined %}Device: {{ device_name }}{% endif %}
{% if symbolic_link is defined %}Symbolic Link: {{ symbolic_link }}{% endif %}
{% if dispatch_handler is defined %}Dispatch Handler: {{ dispatch_handler }}{% endif %}
{% if findings is defined and findings|length > 0 %}
Semgrep Findings ({{ finding_count }}):
{% for f in findings[:20] %}
  - [{{ f.severity }}] {{ f.rule_id }}: {{ f.message[:200] }}
    {{ f.file }}:{{ f.line_start }}
{% endfor %}
{% endif %}

Payload:
{{ dispatch_code }}
```

`dispatch_code` is the full decompiled C from stage 4 — dispatch function plus subfunctions, potentially hundreds of lines. The `max_context_tokens: 900000` budget accommodates large decompilation payloads.

The output is written to `{sample_dir}/assessment.md`.

Sources: [pipelines/loldrivers/assessment.j2:1-66]()

---

## Data flow summary

```text
TARGET DIRECTORY (.sys files)
        │
        ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 1: pe_ingest                                              │
│  • recurse DP_* subdirs, hash bytes, parse PE headers           │
│  • output per sample: sha256, md5, is_kernel_driver,            │
│    has_ioctl_surface, dangerous_imports, priority_score         │
└──────────────────────┬──────────────────────────────────────────┘
                       │ all .sys Samples
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 2: metadata_filter (built-in)                             │
│  • require is_kernel_driver=true AND has_ioctl_surface=true     │
│  • dedup by sha256                                              │
└──────────────────────┬──────────────────────────────────────────┘
                       │ kernel drivers with IOCTL surface
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 3: loldrivers_filter                                      │
│  • fetch/cache loldrivers.io/api/drivers.json (7-day TTL)       │
│  • drop samples whose sha256 is already known                   │
└──────────────────────┬──────────────────────────────────────────┘
                       │ unknown (novel) drivers
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 4: ghidra_decompile (parallel: 4, timeout: 600s)          │
│  • analyzeHeadless + extract_dispatch.py post-script            │
│  • output: decompiled/dispatch_ioctl.c, ghidra_result.json,     │
│    ioctls/0xXXXXXXXX.c per IOCTL code                          │
└──────────────────────┬──────────────────────────────────────────┘
                       │ samples with decompiled C
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 5: semgrep_scanner (batch — one semgrep invocation)       │
│  • bulk-dir hard-links all decompiled .c files                  │
│  • scans with 4 rule files (arbitrary_rw, buffer_overflow,      │
│    method_neither, msr_access)                                  │
│  • output: findings.json per sample; drops samples with 0 hits  │
└──────────────────────┬──────────────────────────────────────────┘
                       │ samples with ≥1 semgrep finding
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 6: top_k (reduce — synchronization barrier)               │
│  • sort descending by semgrep_scanner.finding_count             │
│  • keep top 10                                                  │
└──────────────────────┬──────────────────────────────────────────┘
                       │ 10 best candidates
                       ▼
┌─────────────────────────────────────────────────────────────────┐
│ Stage 7: generic_llm (parallel: 2, on_failure: skip)            │
│  • render assessment.j2 with device_name, symbolic_link,        │
│    semgrep findings[:20], dispatch_code                         │
│  • classify_by: [VULNERABLE] or [SAFE]                          │
│  • output: assessment.md per sample                             │
└─────────────────────────────────────────────────────────────────┘
```

---

## Key implementation details worth knowing

### Caching at every expensive stage

Both Ghidra and semgrep results are cached to disk. If `decompiled/ghidra_result.json` exists and is valid JSON, Ghidra is skipped. If `findings.json` exists, the semgrep scan is skipped for that sample. This means re-runs after partial failures are cheap.

Sources: [processors/ghidra_decompile/ghidra_decompile.py:52-61](), [processors/semgrep_scanner/semgrep_scanner.py:54-62]()

### Environment variable expansion

`${GHIDRA_INSTALL_DIR}` and `${JAVA_HOME:-}` in the YAML are expanded at pipeline load time. The `:-` form provides an empty-string default, so `JAVA_HOME` is optional.

Sources: [pipelines/loldrivers/pipeline.yaml:66-67]()

### Semgrep bulk-scan is a single process for N samples

The `BulkMapProcessor` base class exposes a `process(ctx, entries: list[...])` signature. `SemgrepScanner.process` receives all active samples at once, assembles one bulk directory, runs one `semgrep` subprocess, then distributes findings back. The filename prefix `{sample_id}_dispatch_ioctl.c` is the routing key.

Sources: [processors/semgrep_scanner/semgrep_scanner.py:100-125]()

### The LLM receives up to 20 semgrep findings, not all

The Jinja2 template caps the findings list at 20 entries (`findings[:20]`) to keep the prompt bounded even when a driver has many hits.

Sources: [pipelines/loldrivers/assessment.j2:57-61]()

---

## Summary

The `loldrivers` pipeline narrows thousands of raw `.sys` files down to the top 10 novel kernel driver candidates through four successive stages of increasing cost: header parsing (cheap, parallel), loldrivers.io hash lookup (network, once), Ghidra headless decompilation (expensive, 4-way parallel, cached), and bulk semgrep scanning (one process for all). The final LLM stage uses a tightly constrained Jinja2 prompt that explicitly enumerates false positive patterns and enforces a `[VULNERABLE]`/`[SAFE]` first-line verdict — making the output machine-parseable for downstream triage. The full pipeline specification lives in `pipelines/loldrivers/pipeline.yaml:1-95`.
