# Research pipeline

> v3 orchestration: planner sub-queries, parallel source fan-out, fusion, clustering, dedupe, rerank, and depth profiles (--quick / default / --deep).

- Repository: mvanhorn/last30days-skill
- GitHub: https://github.com/mvanhorn/last30days-skill
- Human docs: https://grok-wiki.com/public/docs/mvanhorn-last30days-skill-50b5421a8cca
- Complete Markdown: https://grok-wiki.com/public/docs/mvanhorn-last30days-skill-50b5421a8cca/llms-full.txt

## Source Files

- `skills/last30days/scripts/lib/pipeline.py`
- `skills/last30days/scripts/lib/schema.py`
- `skills/last30days/scripts/lib/fusion.py`
- `skills/last30days/scripts/lib/cluster.py`
- `skills/last30days/scripts/lib/dedupe.py`
- `skills/last30days/scripts/lib/rerank.py`
- `docs/how-search-works.md`

---

---
title: "Research pipeline"
description: "v3 orchestration: planner sub-queries, parallel source fan-out, fusion, clustering, dedupe, rerank, and depth profiles (--quick / default / --deep)."
---

The v3 research engine in `skills/last30days/scripts/lib/pipeline.py` turns a topic string into a `schema.Report`: a `QueryPlan`, per-source evidence, fused `Candidate` rows, optional `Cluster` groups, and warnings. `last30days.py` resolves depth from `--quick` / `--deep`, calls `pipeline.run()`, then hands the report to renderers (`--emit=compact`, `json`, `context`, `md`, `html`). Comparison and vs-topic flows invoke the same pipeline once per entity via `fanout.run_competitor_fanout`.

## Pipeline overview

```mermaid
flowchart TB
  subgraph entry [CLI entry]
    CLI[last30days.py]
  end
  subgraph plan [Planning]
    PL[planner.plan_query / --plan]
    RT[providers.resolve_runtime]
  end
  subgraph phase1 [Phase 1 — parallel retrieval]
    TP[ThreadPoolExecutor]
    RS[_retrieve_stream per subquery × source]
    NSD[_normalize_score_dedupe]
  end
  subgraph phase2 [Phase 2 — recovery]
    SUP[_run_supplemental_searches]
    RET[_retry_thin_sources]
  end
  subgraph rank [Global ranking]
    FUS[weighted_rrf]
    RR[rerank.rerank_candidates]
    FUN[rerank.score_fun]
    GH[github.enrich_candidates_with_stars]
    CL[cluster_candidates]
  end
  CLI --> PL
  CLI --> RT
  PL --> TP
  RT --> PL
  TP --> RS --> NSD
  NSD --> SUP
  SUP --> RET
  RET --> FUS --> RR --> FUN --> GH --> CL
  CL --> REP[schema.Report]
```

| Stage | Module | Output |
| --- | --- | --- |
| Plan | `planner.py` | `QueryPlan` with 1–5 `SubQuery` rows |
| Fan-out | `pipeline.py` | `RetrievalBundle` keyed by `(subquery_label, source)` |
| Per-stream cleanup | `normalize`, `signals`, `dedupe`, `snippet` | `SourceItem` lists capped by depth |
| Fusion | `fusion.py` | `Candidate` pool (RRF + diversity) |
| Rerank | `rerank.py` | `rerank_score`, `final_score`, `fun_score` |
| Cluster | `cluster.py` | `Cluster` list with MMR representatives |

## Depth profiles

Depth is a single string: `"quick"`, `"default"`, or `"deep"`. The CLI sets it as `deep` if `--deep`, else `quick` if `--quick`, else `default`.

### Global pool limits (`DEPTH_SETTINGS`)

These caps apply after each retrieval stream is normalized; they do not replace per-source `DEPTH_CONFIG` inside Reddit, X, YouTube, and similar adapters.

| Depth | `per_stream_limit` | `pool_limit` (fusion) | `rerank_limit` (LLM shortlist) |
| --- | ---: | ---: | ---: |
| `quick` | 6 | 15 | 12 |
| `default` | 12 | 40 | 40 |
| `deep` | 20 | 60 | 60 |

### Planner behavior by depth

| Depth | Subqueries | Sources per subquery |
| --- | --- | --- |
| `quick` | At most **1** after sanitization | Up to **2** per intent via `SOURCE_LIMITS` and `QUICK_SOURCE_PRIORITY` |
| `default` | LLM or fallback (typically 2–5) | **All** eligible sources expanded per intent (`_trim_subqueries_for_depth` overrides narrow LLM source lists) |
| `deep` | Same as default | Same as default; higher retrieval caps downstream |

### Phases skipped at `quick`

- **Phase 2 supplemental** (`_run_supplemental_searches`): entity-derived X handle searches from Phase 1 Reddit/X text — skipped when `depth == "quick"` or `mock`.
- **Phase 2b thin retry** (`_retry_thin_sources`): simplified core-subject re-query for sources with fewer than three items — skipped when `depth == "quick"`.

### Reasoning models

`providers.resolve_runtime(config, depth)` picks planner and rerank models. With `LAST30DAYS_REASONING_PROVIDER=auto`, the first configured key wins: Gemini → OpenAI → xAI → OpenRouter; otherwise `reasoning_provider="local"` and planner/rerank fall back to deterministic scoring.

<ParamField body="--quick" type="flag">
Lower-latency profile: one planner subquery, tight source budget, no supplemental or thin-source retry, smaller fusion/rerank pools.
</ParamField>

<ParamField body="--deep" type="flag">
Higher-recall profile: larger per-stream and pool limits. When the reasoning provider is Gemini, default rerank model upgrades to Gemini Pro (`providers._resolve_model_pins`).
</ParamField>

<ParamField body="LAST30DAYS_PLANNER_MODEL" type="string">
Overrides default planner model for the resolved reasoning provider.
</ParamField>

<ParamField body="LAST30DAYS_RERANK_MODEL" type="string">
Overrides default rerank model for the resolved reasoning provider.
</ParamField>

## Query planning

`planner.plan_query()` returns a `QueryPlan`:

- **`intent`**: `factual`, `product`, `concept`, `opinion`, `how_to`, `comparison`, `breaking_news`, or `prediction`
- **`freshness_mode`**: `strict_recent`, `balanced_recent`, or `evergreen_ok`
- **`cluster_mode`**: `none`, `story`, `workflow`, `market`, or `debate` (drives downstream clustering)
- **`subqueries`**: each `SubQuery` has `label`, `search_query`, `ranking_query`, `sources`, and `weight`
- **`source_weights`**: per-source multipliers applied during fusion

### Plan sources (priority order)

1. **`--plan` JSON** (hosting agent or file path): sanitized through `_sanitize_plan`; `plan_source="external"`.
2. **LLM planner** when a reasoning provider and `planner_model` are configured: `provider.generate_json`; `plan_source="llm"`.
3. **Deterministic fallback** when LLM fails or no provider keys exist: comparison entities, intent heuristics, or generic primary subquery; `plan_source="deterministic"`.

<Warning>
When no `--plan` is passed and the engine has no internal reasoning credentials, stderr emits a LAW 7 reminder: the **hosting** agent (Claude Code, Codex, Hermes, etc.) should generate the plan JSON and pass `--plan`. The deterministic fallback is intended for headless/cron runs.
</Warning>

Planner traces always go to stderr (not user stdout):

```
[Planner] Plan: intent=..., freshness=..., cluster_mode=..., subqueries=N, source=llm|deterministic|external
[Planner]   sq1 label=... search="..." sources=[reddit,x,...]
```

If `grounding` is available and `--web-backend` is not `none`, every subquery gets `grounding` appended as a safety net even when the planner omits it.

## Phase 1: parallel source fan-out

For each `(subquery, source)` in the plan where `source` is in `available_sources(config)`:

1. Submit `_retrieve_stream` to a `ThreadPoolExecutor` (`max_workers` between 4 and 16, scaled to stream count).
2. Enforce **`MAX_SOURCE_FETCHES`**: X is capped at **2** submissions per run to limit 429 cascades.
3. On **429**, mark the source rate-limited (thread-safe set) so sibling futures skip it.
4. On **5xx**, sleep 3s and retry the stream once.
5. Run **`_normalize_score_dedupe`** on raw dicts, then truncate to `per_stream_limit`.

### GitHub pre-pass

Before the main loop, optional **`--github-repo`** (project mode) or **`--github-user`** (person mode) runs a dedicated GitHub search and injects results under the primary subquery label, skipping redundant keyword GitHub streams later.

### `available_sources`

Sources are included only when credentials or binaries exist (for example `reddit` always, `x` when Bird/xAI/xurl is configured, `grounding` when Brave/Exa/Serper/Parallel keys exist). `EXCLUDE_SOURCES` and `INCLUDE_SOURCES` further filter the set. `pipeline.diagnose()` reports the resolved list for `--diagnose`.

### Per-stream normalization (`_normalize_score_dedupe`)

| Step | Module | Role |
| --- | --- | --- |
| Normalize | `normalize.normalize_source_items` | Dates, URLs, engagement fields |
| Annotate | `signals.annotate_stream` | `local_relevance`, freshness, engagement, `local_rank_score` |
| Prune | `signals.prune_low_relevance` | Drops items below relevance floor (default 0.15) |
| Dedupe | `dedupe.dedupe_items` | Hybrid n-gram + token Jaccard (threshold 0.7) |
| Snippet | `snippet.extract_best_snippet` | Best excerpt for rerank prompts |

Reddit and YouTube often use **`raw_topic`** (original user string) inside `_retrieve_stream` so query expansion helpers see the full topic, not only the planner’s narrowed `search_query`.

## Phase 2: supplemental and thin-source retry

**Supplemental** (default/deep only): `entity_extract` pulls X handles and subreddits from Phase 1 Reddit/X payloads (plus `--x-handle` / `--x-related`). With Bird as the X backend, `bird_x.search_handles` adds targeted posts; related handles use label `supplemental-related` at weight **0.3**.

**Thin retry**: Sources with fewer than three items (and no error) get a second fetch using `query.extract_core_subject(topic, max_words=3)` as `search_query`, weight **0.3**, merged without duplicate URLs. GitHub is skipped when project/person mode already ran.

## Fusion (`weighted_rrf`)

`fusion.weighted_rrf` merges all `(label, source)` streams:

- **Score**: `subquery.weight × plan.source_weights[source] / (RRF_K + rank)` with `RRF_K = 60`
- **Dedup key**: normalized URL, or `source:item_id`
- **Merge**: accumulates `rrf_score`, max relevance/freshness/engagement, provenance metadata
- **Per-author cap**: at most **3** items per author/handle in the fused list
- **Diversity pool**: reserves up to **2** slots per source whose best item has `local_relevance ≥ 0.25`, then fills to `pool_limit`

## Rerank and fun scoring

`rerank.rerank_candidates` scores the fused shortlist (size `rerank_limit`):

- **LLM path**: JSON relevance 0–100 per `candidate_id`, with intent-specific hints and entity-grounding rules for the primary entity extracted from the topic
- **Fallback**: weighted mix of `local_relevance`, freshness, and `source_quality`; **−25** `ENTITY_MISS_PENALTY` when the primary entity is absent from title/snippet/transcript/comments
- **Tail**: candidates beyond the shortlist get fallback scores only

`rerank.score_fun` adds optional `fun_score` / `fun_explanation` (humor/virality judge) on up to 60 candidates.

Post-rerank, **`github.enrich_candidates_with_stars`** fills star counts for GitHub survivors when not in mock mode.

## Clustering (`cluster_candidates`)

| `plan.intent` / `cluster_mode` | Behavior |
| --- | --- |
| Intent not in `CLUSTERABLE_INTENTS` or `cluster_mode == "none"` | One cluster per candidate (no merging) |
| `breaking_news`, `opinion`, `comparison`, `prediction` + story/workflow/market/debate | Greedy text-similarity groups (threshold 0.42 news / 0.48 otherwise), then **entity overlap merge** across sources for small clusters |
| Representatives | MMR (`diversity_lambda=0.75`), up to 3 per cluster |

Uncertainty flags: `single-source` or `thin-evidence` when group scores or source diversity are weak.

## Final report and warnings

`pipeline.run` returns `schema.Report` with:

- `query_plan`, `ranked_candidates`, `clusters`, `items_by_source`
- `errors_by_source` (cleared when that source still returned items)
- `warnings` (thin evidence, single-source concentration, partial source failures)
- `artifacts` (grounding bundles, `plan_source` for degraded-run banners)

`_finalize_items_by_source` re-sorts, re-dedupes per source, applies Polymarket topic/keyword filters, and runs Digg X-post enrichment on survivors only.

## Invocation examples

<RequestExample>
```bash
# Default depth — full planner expansion, supplemental + thin retry
python3 skills/last30days/scripts/last30days.py "Hermes agent workflows" --emit=compact
```
</RequestExample>

<RequestExample>
```bash
# Quick profile — latency-biased
python3 skills/last30days/scripts/last30days.py "OpenClaw plugins" --quick --search=reddit,hackernews --emit=json
```
</RequestExample>

<RequestExample>
```bash
# Hosting agent supplies plan (headless-quality planning without engine API keys)
python3 skills/last30days/scripts/last30days.py "Topic" --plan /path/to/plan.json --emit=context
```
</RequestExample>

<RequestExample>
```bash
# Mock pipeline (no live credentials) — used in tests
python3 skills/last30days/scripts/last30days.py "test" --mock --quick --emit=compact
```
</RequestExample>

## Operational signals

| Signal | Where | Meaning |
| --- | --- | --- |
| `[Planner] Plan: ... source=deterministic` | stderr | No LLM plan; consider passing `--plan` from the hosting agent |
| `DEGRADED RUN` banner | compact output | `plan_source=deterministic` on named-entity topics without pre-research flags |
| `Some sources failed: ...` | `Report.warnings` | Partial retrieval; check keys via `--diagnose` |
| `Evidence is thin` | `Report.warnings` | Fewer than five ranked candidates |

<Note>
Slash-command invocations (`/last30days topic`) do not pass shell flags. The hosting model maps user intent (quick vs deep, source subset, comparison) into engine flags or a `--plan` file before calling `last30days.py`.
</Note>

## Key modules

| File | Responsibility |
| --- | --- |
| `lib/pipeline.py` | Orchestration, depth settings, phases, `run()` |
| `lib/planner.py` | LLM/deterministic `QueryPlan` |
| `lib/schema.py` | `SubQuery`, `QueryPlan`, `SourceItem`, `Candidate`, `Cluster`, `Report` |
| `lib/fusion.py` | Weighted RRF and pool diversification |
| `lib/dedupe.py` | Within-stream near-duplicate removal |
| `lib/rerank.py` | LLM relevance + fun judge + entity demotion |
| `lib/cluster.py` | Story grouping and cross-source entity merge |
| `lib/providers.py` | Reasoning provider and model pins |

Per-platform retrieval limits and auth live in each source module; see the data-sources page for adapter-specific `DEPTH_CONFIG` tables.

## Related pages

<CardGroup>
  <Card title="Skill, engine, and harness" href="/skill-engine-harness">
    Boundaries between SKILL.md, the Python engine, and harness runtimes that invoke `/last30days`.
  </Card>
  <Card title="Query types" href="/query-types">
    Intent classification, pre-flight refusal, and how intent shapes planner defaults.
  </Card>
  <Card title="Data sources" href="/data-sources">
    Per-platform retrieval modules, keys, and source-level depth configs.
  </Card>
  <Card title="Output contract" href="/output-contract">
    How `Report` fields map to compact, JSON, and synthesis output.
  </Card>
  <Card title="CLI reference" href="/cli-reference">
    All flags including `--plan`, `--search`, `--quick`, and `--deep`.
  </Card>
  <Card title="Comparison mode" href="/comparison-mode">
    Multi-entity fan-out that reuses `pipeline.run` per competitor.
  </Card>
</CardGroup>
