# Indexing Pipeline: Crawl, Extract, Chunk, Embed, Store

> The incremental ingestion path stage by stage: the file crawler and mtime/size change detection, text/PDF/media extraction, chunking, the concurrent decode stage feeding a single serialized GPU embed stage, and persistence into the SQLite-backed bf16 vector store. Includes the FSWatcher that triggers re-indexing.

- Repository: hanxiao/omni-macos
- GitHub: https://github.com/hanxiao/omni-macos
- Human wiki: https://grok-wiki.com/public/wiki/hanxiao-omni-macos-7817a5cffe05
- Complete Markdown: https://grok-wiki.com/public/wiki/hanxiao-omni-macos-7817a5cffe05/llms-full.txt

## Source Files

- `Sources/OmniKit/Indexer.swift`
- `Sources/OmniKit/FileCrawler.swift`
- `Sources/OmniKit/FileExtractor.swift`
- `Sources/OmniKit/FSWatcher.swift`
- `Sources/OmniKit/IndexSettings.swift`
- `Sources/OmniKit/VectorStore.swift`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:
- [Sources/OmniKit/Indexer.swift](Sources/OmniKit/Indexer.swift)
- [Sources/OmniKit/FileCrawler.swift](Sources/OmniKit/FileCrawler.swift)
- [Sources/OmniKit/FileExtractor.swift](Sources/OmniKit/FileExtractor.swift)
- [Sources/OmniKit/FSWatcher.swift](Sources/OmniKit/FSWatcher.swift)
- [Sources/OmniKit/IndexSettings.swift](Sources/OmniKit/IndexSettings.swift)
- [Sources/OmniKit/VectorStore.swift](Sources/OmniKit/VectorStore.swift)
- [App/AppModel.swift](App/AppModel.swift)
</details>

# Indexing Pipeline: Crawl, Extract, Chunk, Embed, Store

This page traces a file from the disk to a searchable vector, stage by stage. Omni indexes the user's folders (Documents, Downloads, Desktop by default) incrementally: it walks each root, decides what changed, pulls embeddable content out of each file, splits text into overlapping chunks, runs everything through the in-process MLX embedder, and persists L2-normalized vectors into a SQLite-backed store that doubles as an in-memory bf16 matrix for cosine search. The whole flow lives in `OmniKit` and is driven by one method, `Indexer.index(roots:settings:force:onProgress:)`.

The central design idea is a **two-stage pipeline**: many CPU cores decode files concurrently (extraction, PDF rasterization, video frame sampling, audio mel STFT, image patchify), while a *single* serialized consumer thread owns the GPU and does only embedding. That split keeps the GPU continuously fed without ever running two forward passes at once, which is what lets an interactive search stay responsive while a full index is in flight. A separate, lighter path — the `FSWatcher` plus `AppModel` — keeps the index current after the first full pass by reacting to filesystem events.

Start reading at `Indexer.index(...)` ([Indexer.swift:124-341](Sources/OmniKit/Indexer.swift#L124-L341)); the `pipeline(...)` and `decode(...)` helpers below it are where the concurrency model lives.

## The shape of the pipeline

```mermaid
flowchart TB
  subgraph trigger["Triggers"]
    full["AppModel.startIndexing\n(full / resume pass)"]
    fsw["FSWatcher (FSEvents)\n1.5s coalesced bursts"]
  end

  subgraph crawl["Crawl + change detection (per root)"]
    fc["FileCrawler.walk\nskip hidden / bundles / node_modules,.git,...\nmtime+size -> CrawledFile"]
  end

  subgraph cpu["Concurrent decode stage (up to activeProcessorCount cores)"]
    dec["Indexer.decode(file)\nFileExtractor.extract / chunk()\nPDF render, video keyframes,\naudio mel, image patchify"]
  end

  subgraph gpu["Serial embed stage (single thread, owns GPU)"]
    emb["Embedder.embedTextBatches /\nembedImages / embedAudioMelBatch\nlength-bucketed, batched MLX forward"]
  end

  subgraph store["Persistence"]
    vs["VectorStore.replace / replaceMany\nSQLite chunks table (bf16 BLOB)\n+ in-memory flat16 matrix"]
  end

  full --> fc
  fsw -.path-only events.-> upd["Indexer.update(paths:)\n(no crawl)"]
  fc -->|"CrawledFile, bounded by semaphore"| dec
  dec -->|"DecodedItem mailbox\n(ordered, ReadyBox)"| emb
  emb --> vs
  upd --> dec
```

`FileCrawler` produces `CrawledFile` values; `pipeline(...)` fans them across cores into `decode(...)`, which yields a `DecodedItem`; the serial consumer embeds and calls `VectorStore`. `Sources: [Indexer.swift:84-117](Sources/OmniKit/Indexer.swift#L84-L117), [Indexer.swift:397-479](Sources/OmniKit/Indexer.swift#L397-L479)`

## Stage 1 - Crawl and change detection

`FileCrawler.walk` recursively enumerates each root with `FileManager.enumerator`, skipping hidden files, package bundles, and a fixed denylist of noise directories (`node_modules`, `.git`, `Library`, `Pods`, `.build`, `DerivedData`, `venv`, `__pycache__`, `dist`, `build`, `target`, and more). Only regular files that are a *supported, enabled* kind and under `maxFileSize` (200 MB) survive, each becoming a `CrawledFile` carrying its `url`, `modified` (mtime, epoch seconds), and `size`.

```swift
// FileCrawler.swift - the per-entry filter
guard vals.isRegularFile == true,
      FileExtractor.isSupported(url, enabledKinds: enabledKinds, disabledExtensions: disabledExtensions)
else { continue }
let size = vals.fileSize ?? 0
if size > maxFileSize { continue }
let mtime = vals.contentModificationDate?.timeIntervalSince1970 ?? 0
onFile(CrawledFile(url: url, modified: mtime, size: size))
```

`index(...)` walks every root exactly once (a deliberate single pass - an earlier version stat-counted then re-walked, doubling traversal before the first embed), publishing each root's file count as its determinate progress total. Files are then **interleaved round-robin across roots** so every folder makes progress from the start, and **grouped by modality** so a whole kind is processed in one uniform phase. Incremental skipping happens inside `pipeline(...)`: a file whose stored `(modified, size)` still match `store.indexedFiles()` is marked `unchanged` and never decoded. `Sources: [FileCrawler.swift:39-63](Sources/OmniKit/FileCrawler.swift#L39-L63), [Indexer.swift:124-189](Sources/OmniKit/Indexer.swift#L124-L189), [Indexer.swift:407-417](Sources/OmniKit/Indexer.swift#L407-L417)`

## Stage 2 - Extraction by modality

`FileExtractor.kind(for:)` maps an extension to one of four `FileKind`s, and `extract(...)` dispatches to the right reader. The text path dominates but each modality has a distinct decode:

| Kind | Source path | What `decode` produces | Notes |
|------|-------------|------------------------|-------|
| Text | `extractText` reads up to `maxTextBytes` (2 MB), UTF-8 then ISO-Latin1 fallback | `.text([chunks])` | Covers code, markdown, config, logs |
| Text (PDF) | `extractPDF`: real text if `>= minCharsPerPage * pageCount`, else rasterize up to `maxScanPages` (8) pages to images | `.text` or `.imagePatches` | Scanned PDFs fall through to the vision tower |
| Text (office) | `extractOffice` via `NSAttributedString` (rtf, docx, pages, ...) | `.text` | AppKit-only |
| Image | `loadImage` thumbnails to `maxImageDimension` (1568) then `OmniVisionPreprocess.preprocessRaw` | `.imagePatches([RawPatches])` | Patchify runs in the decode stage, off the GPU thread |
| Video | `videoFrames`: dense sampling + average-hash dedup keeps up to `maxVideoFrames` (6) distinct keyframes | `.images([CGImage])` | One temporal clip -> one embedding |
| Audio | `OmniAudioPreprocess.melFeatures` computes the mel buffer | `.audioMel([Float], frames)` | STFT runs on background cores |

A key efficiency choice: still images and PDF pages are *preprocessed* (resize + parallel patchify) inside `decode`, so the serialized GPU thread only runs the vision tower, not the heavy CPU patchify. Video frames stay as raw `CGImage`s because they form one clip embedded together. `Sources: [FileExtractor.swift:65-106](Sources/OmniKit/FileExtractor.swift#L65-L106), [FileExtractor.swift:121-237](Sources/OmniKit/FileExtractor.swift#L121-L237), [Indexer.swift:438-479](Sources/OmniKit/Indexer.swift#L438-L479)`

Per-modality minimum thresholds from `IndexSettings` (`minTextChars`, `minImageDimension`, `minAudioSeconds`, `minVideoSeconds`) are applied here; a file below its threshold returns an empty `DecodedItem` and is counted as skipped rather than embedded. `Sources: [IndexSettings.swift:23-31](Sources/OmniKit/IndexSettings.swift#L23-L31), [Indexer.swift:443-470](Sources/OmniKit/Indexer.swift#L443-L470)`

## Stage 3 - Chunking text

Text longer than `maxCharsPerChunk` (user-set, default 1800, floored at 200) is split into overlapping windows. The slide step is `limit - chunkOverlap` (overlap 200), capped at `maxChunksPerFile` (40), so very long files contribute a bounded number of chunks while preserving cross-boundary context.

```swift
// Indexer.swift - chunk(_:)
let limit = max(200, active.maxCharsPerChunk)
let scalars = Array(text)
if scalars.count <= limit { return [text] }
let step = max(1, limit - chunkOverlap)
while start < scalars.count && chunks.count < maxChunksPerFile {
    let end = min(start + limit, scalars.count)
    chunks.append(String(scalars[start ..< end]))
    if end == scalars.count { break }
    start += step
}
```

Each chunk also gets a `snippet` (newline/tab-collapsed prefix of `snippetLength` = 220 chars) for the UI. `Sources: [Indexer.swift:558-577](Sources/OmniKit/Indexer.swift#L558-L577)`

## Stage 4 - Concurrent decode feeding a single serialized embed

This is the heart of the pipeline. `pipeline(...)` runs a producer that bounds outstanding work with a semaphore sized to `activeProcessorCount`, dispatches `decode(...)` onto a concurrent queue, and drops each result into an index-keyed mailbox (`ReadyBox`) guarded by an `NSCondition`. The consumer pulls items **in strict file order**, serially, on the calling thread - so embedding is never concurrent, even though decode is.

```swift
// Indexer.swift - pipeline(...)
let maxInFlight = max(2, ProcessInfo.processInfo.activeProcessorCount)
let sem = DispatchSemaphore(value: maxInFlight)
producerQ.async {
    for (i, file) in files.enumerated() {
        sem.wait()                       // bound decoding + decoded-not-consumed
        ... decodeQ.async { ready.items[i] = self.decode(file); cond.signal() }
    }
}
for i in 0 ..< files.count {
    cond.lock(); while ready.items[i] == nil { cond.wait() }
    let item = ready.items.removeValue(forKey: i)!; cond.unlock()
    sem.signal()
    if item.abandoned { continue }       // paused: don't consume/count
    consume(item)
}
```

Because each modality is processed as a uniform phase, the consumer can **batch across files** into one GPU forward instead of many tiny ones:

- **Text** buffers chunks from many files into a staging window (`textBatchSize * 6`), **length-buckets** the window (sorting by length so each batch pads to a near-uniform max length), carves it into `textBatchSize` (default 16) groups, and hands the whole set to `embedTextBatches` in one serialized call. A file is stored only once all of its chunks return, preserving per-file atomicity. The batch size of 16 is a measured responsiveness sweet spot: a query's GPU eval queues behind the in-flight forward, so smaller batches (16 vs 48) cut the search p95 tail (~164 ms vs ~385 ms) with flat-to-better index throughput.
- **Audio** stages decoded mels and embeds up to `audioMaxClipsPerBatch` (16) clips bounded by a total-frame budget (`audioFrameBudget` = 24000) in one tower + backbone forward; a clip larger than the budget embeds alone.
- **Images / scanned PDFs** embed all pages of a file in one block-diagonal vision forward.

The order of phases is `IndexSettings.kindOrder` (default `[.image, .audio, .video, .text]` - media first because it is slower, so its results surface sooner). `Sources: [Indexer.swift:190-295](Sources/OmniKit/Indexer.swift#L190-L295), [Indexer.swift:397-436](Sources/OmniKit/Indexer.swift#L397-L436), [Indexer.swift:96-110](Sources/OmniKit/Indexer.swift#L96-L110), [IndexSettings.swift:18-21](Sources/OmniKit/IndexSettings.swift#L18-L21)`

### Cancellation and resume

`DecodedItem` carries two flags that keep pause/resume correct. `unchanged` means already indexed and current (counted, nothing to do); `abandoned` means produced after a cancel and never consumed - it is *not* counted as a skip and will be re-indexed on resume. Crucially, deletion reconciliation only runs on a fully completed pass, never on a cancelled one, because a paused run has not seen every file and must not purge "unseen" paths. `Sources: [Indexer.swift:65-80](Sources/OmniKit/Indexer.swift#L65-L80), [Indexer.swift:302-337](Sources/OmniKit/Indexer.swift#L302-L337)`

## Stage 5 - Persisting into the bf16 vector store

`VectorStore` is SQLite-backed but keeps every vector resident in memory as **bf16** (2 bytes/dim, half the residency of fp32 with negligible recall loss on L2-normalized vectors). `replace(path:chunks:)` is atomic per file: inside one transaction it deletes the path's old rows and inserts the new chunks, storing each embedding as a bf16 `BLOB`. It then mirrors the rows into the in-memory `flat16` matrix.

The store is tuned for the dominant indexing case - appending a brand-new file:

- A `presentPaths` set lets `replace` know in O(1) whether a path pre-exists; a new file skips the O(N) buffer rebuild entirely and just appends (geometric growth, amortized O(1)).
- New rows append *past* `baseRows` and are scored as a small "delta" matmul per query, so an ordinary indexing append does **not** rebuild the ~0.8 GB resident base matrix - that happens only on a structural change (delete/reload) or when the delta exceeds `foldThreshold` (50 000).
- Schema is a rebuildable cache: WAL journaling, `synchronous=NORMAL`, 256 MB mmap and page cache. On a `schemaVersion` mismatch the `chunks` table is simply dropped and recreated; display-metadata columns (`width`, `height`, `duration`) were added via lazy `ADD COLUMN` migrations without forcing a reindex.

```swift
// VectorStore.swift - new-file fast path after COMMIT
if presentPaths.contains(path) { removeRowsLocked { $0.path == path } }
for (i, c) in chunks.enumerated() {
    rows.append(Row(...)); flat16.append(contentsOf: bfs[i]); fileID.append(internPath(c.path))
}
presentPaths.insert(path)
// No invalidateBase(): a new path's rows are scored as delta.
```

`replaceMany(...)` handles file-watcher bursts (bulk edit, git checkout, synced folder) as one transaction and one in-memory rebuild instead of O(N) per file, and `deletePaths(...)` does the same for batched deletions. `Sources: [VectorStore.swift:84-131](Sources/OmniKit/VectorStore.swift#L84-L131), [VectorStore.swift:219-272](Sources/OmniKit/VectorStore.swift#L219-L272), [VectorStore.swift:279-370](Sources/OmniKit/VectorStore.swift#L279-L370), [VectorStore.swift:159-206](Sources/OmniKit/VectorStore.swift#L159-L206)`

Non-finite embeddings are filtered before storage; a file with zero finite chunks is logged and counted as skipped, while a store failure increments `failed`. `Sources: [Indexer.swift:174-184](Sources/OmniKit/Indexer.swift#L174-L184)`

## The FSWatcher: keeping the index live

After the first full pass, re-indexing is event-driven. `FSWatcher` wraps FSEvents with `kFSEventStreamCreateFlagFileEvents`, coalescing bursts with a 1.5 s latency. It persists `lastEventId` so a relaunch can replay changes missed while the app was closed (`since:` is read from `UserDefaults` at construction).

```swift
// FSWatcher.swift
let flags = UInt32(kFSEventStreamCreateFlagFileEvents | kFSEventStreamCreateFlagNoDefer
                 | kFSEventStreamCreateFlagUseCFTypes | kFSEventStreamCreateFlagWatchRoot)
FSEventStreamCreate(kCFAllocatorDefault, fsEventsCallback, &context,
                    paths as CFArray, sinceWhen, 1.5 /* coalesce bursts */, flags)
```

`AppModel.handleFSChange` is the consumer. It drops events under paused folders, **buffers** changes while a full index is running (draining them on completion), and otherwise calls `Indexer.update(paths:settings:)` on a detached utility task. `update(...)` performs a *targeted* re-embed with **no crawl**: each path is re-classified by its current `(modified, size)`; unchanged files are ignored, now-unsupported/deleted files are queued for deletion, and a directory event triggers a scoped `FileCrawler.walk` so freshly added subtrees get indexed. The whole batch is applied via `deletePaths` + `replaceMany`. `Sources: [FSWatcher.swift:13-51](Sources/OmniKit/FSWatcher.swift#L13-L51), [AppModel.swift:1338-1381](App/AppModel.swift#L1338-L1381), [Indexer.swift:345-390](Sources/OmniKit/Indexer.swift#L345-L390)`

## Configuration surface

`IndexSettings` is the single struct carrying everything the indexer needs. `AppModel.effectiveSettings()` populates it from user preferences before each pass.

| Field | Default | Effect |
|-------|---------|--------|
| `enabledKinds` | all four | Which modalities are crawled and indexed |
| `disabledExtensions` | empty | Extensions turned off within an enabled kind |
| `kindOrder` | `[image, audio, video, text]` | Phase order; sets which modality embeds first |
| `maxImageDimension` | 1568 | Largest image/PDF-page side decoded |
| `maxVideoFrames` | 6 | Keyframes sampled per video |
| `maxCharsPerChunk` | 1800 | Longest text slice per chunk before overlap split |
| `minImageDimension` / `minAudioSeconds` / `minVideoSeconds` / `minTextChars` | 0 | Per-modality skip thresholds |

`Sources: [IndexSettings.swift:1-56](Sources/OmniKit/IndexSettings.swift#L1-L56), [AppModel.swift:1323-1334](App/AppModel.swift#L1323-L1334)`

## Summary

Omni's ingestion path is deliberately asymmetric: cheap, parallelizable work (crawling, extraction, rasterization, frame sampling, mel STFT, image patchify) is spread across all CPU cores, while the one expensive, non-shareable resource - the MLX GPU forward - is fed by a single serialized consumer that batches across files for throughput yet stays small enough (text batch 16) to keep concurrent search responsive. Incrementality is enforced end to end by `(mtime, size)` change detection, per-file atomic `replace`, abandon-on-cancel semantics, and deletion reconciliation gated on a complete pass. Persistence keeps SQLite as durable truth while a resident bf16 matrix with a base/delta split absorbs ordinary appends without rebuilding. The `FSWatcher` closes the loop, turning filesystem events into targeted, crawl-free updates so the index stays current after the initial build. `Sources: [Indexer.swift:84-341](Sources/OmniKit/Indexer.swift#L84-L341), [VectorStore.swift:84-273](Sources/OmniKit/VectorStore.swift#L84-L273)`
