# Flue workflows

> Define workflow modules, initialize agents inside `run()`, invoke via CLI or HTTP, and inspect run events with `flue logs`.

- Repository: withastro/flue-with-vercel-eve
- GitHub: https://github.com/withastro/flue
- Human docs: https://grok-wiki.com/public/docs/withastro-flue-with-vercel-eve-f4b79875fff6
- Complete Markdown: https://grok-wiki.com/public/docs/withastro-flue-with-vercel-eve-f4b79875fff6/llms-full.txt

## Source Files

- `withastro-flue:examples/hello-world/src/workflows/hello.ts`
- `withastro-flue:examples/hello-world/src/workflows/with-subagent.ts`
- `withastro-flue:examples/braintrust/src/workflows/task.ts`
- `withastro-flue:packages/runtime/src/runtime/flue-app.ts`
- `withastro-flue:packages/runtime/src/runtime/inspect.ts`
- `withastro-flue:packages/cli/bin/flue.ts`
- `withastro-flue:packages/sdk/src/index.ts`

---

---
title: "Flue workflows"
description: "Define workflow modules, initialize agents inside `run()`, invoke via CLI or HTTP, and inspect run events with `flue logs`."
---

A Flue workflow is a discovered module under `workflows/` that exports a callable `run(ctx)` handler. Each invocation creates a workflow run with a unique `runId`, persists run metadata and a Durable Streams event log, and returns the handler's return value as the terminal result. Agents are initialized inside `run()` via `ctx.init(agent)`; HTTP exposure is optional and controlled by an exported `route` middleware.

## Module contract

The build discovers one workflow per source file in `workflows/` (or `.flue/workflows/` when that source root is active). The file basename becomes the workflow name: `workflows/hello.ts` registers as `hello`.

| Export | Required | Role |
| --- | --- | --- |
| `run` | Yes | Callable handler invoked for each workflow run. Must be a function. |
| `route` | No | Hono middleware. When present, the workflow opts into HTTP transport at `POST /workflows/:name`. |

Discovery validates the contract at build time. A missing or non-callable `run` export fails the build. A `route` export that is not a callable middleware fails the build.

```ts title="workflows/hello.ts"
import { createAgent, type FlueContext, type WorkflowRouteHandler } from '@flue/runtime';

export const route: WorkflowRouteHandler = async (_c, next) => next();

const agent = createAgent(() => ({ model: 'anthropic/claude-sonnet-4-6' }));

export async function run({ init, log, payload }: FlueContext<{ name?: string }>) {
  const harness = await init(agent);
  const session = await harness.session();
  const { text } = await session.prompt(`Hello, ${payload.name ?? 'Developer'}!`);
  log.info('greeting complete', { length: text.length });
  return { greeting: text };
}
```

<Note>
Workflow names must be non-empty. Unlike agents and channels, workflow names may contain `:`.
</Note>

Export `route` when callers should reach the workflow over HTTP. The middleware runs before admission and is the right place for authentication, rate limits, or request rejection. On Node and Cloudflare, the same `route` export also guards `GET /runs/:runId` reads for runs owned by that workflow.

## `FlueContext` inside `run()`

`run()` receives a `FlueContext<TPayload, TEnv>` with the current invocation state. Type parameters for `payload` and `env` are compile-time only; Flue does not validate payload shape at runtime.

<ParamField body="id" type="string">
Workflow run ID for this invocation (same value as `runId` on the internal context).
</ParamField>

<ParamField body="payload" type="TPayload">
JSON body from the invoker. Defaults to `{}` when omitted.
</ParamField>

<ParamField body="env" type="TEnv">
Platform environment bindings. `process.env` on Node; Worker `env` on Cloudflare.
</ParamField>

<ParamField body="req" type="Request | undefined">
Standard Fetch `Request` for the current invocation. Undefined outside HTTP contexts. Body is single-use; call `req.clone()` to read it more than once.
</ParamField>

<ParamField body="log" type="FlueLogger">
Emits structured `log` events (`info`, `warn`, `error`) into the run's event stream during workflow execution.
</ParamField>

<ParamField body="init(agent, options?)" type="Promise<FlueHarness>">
Initializes a `createAgent(...)` value for this run. Each harness name may be initialized once per context. Returns a harness with `session()`, `shell()`, and `fs`.
</ParamField>

```ts title="workflows/with-request.ts"
export async function run({ req, init }: FlueContext) {
  const authHeader = req?.headers.get('authorization');
  if (!authHeader) return { skipped: true, reason: 'no authorization header' };

  const harness = await init(agent);
  const session = await harness.session();
  const { text } = await session.prompt('Say hello in 5 words.');
  return { skipped: false, text };
}
```

<Warning>
Do not import a workflow module and call `run()` directly from `app.ts`, schedulers, or channel handlers. That bypasses admission, durable run storage, route middleware, and run inspection APIs. Invoke the mounted `POST /workflows/:name` route, use `flue run`, or extract shared pure logic into a separate module.
</Warning>

## Initialize agents in `run()`

Workflows orchestrate finite agent-backed work. Define agents with `createAgent(() => ({ ... }))` at module scope, then call `init(agent)` inside `run()` to obtain a harness for that invocation.

```ts title="workflows/with-subagent.ts"
const greeter = defineAgentProfile({
  name: 'greeter',
  instructions: 'Write one warm, concise greeting.',
});

const agent = createAgent(() => ({ model: 'anthropic/claude-sonnet-4-6', subagents: [greeter] }));

export async function run({ init, payload }: FlueContext<{ name?: string }>) {
  const harness = await init(agent);
  const session = await harness.session();

  const { data } = await session.task(`Greet the user named "${payload.name ?? 'Developer'}".`, {
    agent: 'greeter',
    result: v.object({ greeting: v.string() }),
  });

  return data;
}
```

Common harness patterns inside workflows:

| Method | Use when |
| --- | --- |
| `harness.session()` | Default conversational context for prompts, skills, and tasks. |
| `session.prompt(...)` | Model-backed work with optional structured `result` schema. |
| `session.task(..., { agent })` | Delegate to a configured subagent profile. |
| `harness.fs` / `harness.shell` | Stage inputs or inspect the sandbox without adding session messages. |

Prefer `init(agent)` and session APIs for model work. Calling a provider SDK directly bypasses Flue's tools, skills, sandbox, operation events, and response handling.

## Run lifecycle

Each invocation allocates a new `runId` (format `run_<ulid>`), records a `RunRecord`, and appends events to a Durable Streams log at `/runs/:runId`.

```mermaid
sequenceDiagram
  participant Caller
  participant API as flue() HTTP / flue run IPC
  participant Admission as handleWorkflowRequest
  participant Handler as workflows/:name run()
  participant Store as runStore + eventStreamStore

  Caller->>API: POST payload
  API->>Admission: prepareWorkflowExecution
  Admission->>Store: createRun, capture stream offset
  Admission-->>Caller: 202 runId, streamUrl, offset
  Admission->>Handler: startWorkflowExecution (background)
  Handler->>Store: run_start, log/tool/text events
  Handler->>Store: run_end + result or error
  Caller->>Store: GET /runs/:runId (stream or ?meta)
```

<ResponseField name="status" type="RunStatus">
`active` while the handler runs; `completed` or `errored` after `run_end`.
</ResponseField>

<ResponseField name="result" type="unknown">
Return value from `run()` when `status` is `completed`.
</ResponseField>

<ResponseField name="error" type="unknown">
Terminal error payload when `status` is `errored`.
</ResponseField>

Terminal `run_end` events carry `durationMs`, `isError`, and either `result` or `error`. Agent prompts via `POST /agents/:name/:id` and `dispatch(...)` do not create workflow runs.

## Invoke workflows

### CLI: `flue run`

`flue run` builds the Node target, spawns a short-lived local process, invokes the named workflow once, streams events to stderr, and prints the terminal JSON result to stdout. It does not require HTTP `route` export.

<ParamField body="workflow" type="string" required>
Discovered workflow name (positional argument).
</ParamField>

<ParamField body="--payload" type="json">
JSON object passed as `ctx.payload`. Default: `{}`.
</ParamField>

<ParamField body="--target" type="node">
`flue run` supports Node only. Cloudflare targets must use `flue dev` plus HTTP `curl` or the SDK.
</ParamField>

<ParamField body="--root, --output, --config, --env" type="string">
Project resolution overrides. Same semantics as other application commands.
</ParamField>

<Steps>
<Step title="Start the dev server (HTTP path only)">

```bash
flue dev --target node
```

</Step>

<Step title="Run a workflow locally">

```bash
flue run hello --target node --payload '{"name": "World"}'
```

Expect a `run` line with `runId`, inline event output on stderr, JSON result on stdout, and `workflow completed` on success.

</Step>

<Step title="Inspect the same run against the dev server">

```bash
flue logs <runId>
```

`flue run` history is process-local. `flue logs` reads from a running server's `/runs/:runId` stream (default `http://127.0.0.1:4321`).

</Step>
</Steps>

<Info>
`flue run` uses IPC against `localWorkflowHandlers`, which includes every discovered workflow regardless of HTTP transport. HTTP-only registration applies to `workflowHandlers` exposed by `flue()`.
</Info>

### HTTP: `POST /workflows/:name`

HTTP invocation requires `export const route`. Without it, the workflow is CLI-only.

:::endpoint POST /workflows/:name
Start a workflow run. Default admission returns `202` with stream coordinates. Add `?wait=result` for a synchronous JSON response that includes the terminal result.
:::

<ParamField body="name" type="string" required>
Workflow name from discovery (path parameter).
</ParamField>

<ParamField body="body" type="object">
Workflow-defined JSON payload. Optional; treated as `{}` when omitted.
</ParamField>

<ParamField body="wait" type="query: result">
When set, blocks until the run finishes and returns `200` with `result`.
</ParamField>

<RequestExample>

```bash
curl -X POST http://localhost:4321/workflows/hello \
  -H "Content-Type: application/json" \
  -d '{"name": "World"}'
```

</RequestExample>

<ResponseExample>

```json
{
  "runId": "run_01HXXXXXXXXXXXXXXXXXXXXXX",
  "streamUrl": "http://localhost:4321/runs/run_01HXXXXXXXXXXXXXXXXXXXXXX",
  "offset": "-1"
}
```

</ResponseExample>

Admission responses set `Location` and `Stream-Next-Offset` headers mirroring Durable Streams conventions. Observe events at `GET /runs/:runId`. Read run metadata with `GET /runs/:runId?meta`.

<CodeGroup>

```bash title="Async admission (202)"
curl -X POST http://localhost:4321/workflows/hello \
  -H "Content-Type: application/json" \
  -d '{}'
```

```bash title="Sync result (?wait=result)"
curl -X POST "http://localhost:4321/workflows/hello?wait=result" \
  -H "Content-Type: application/json" \
  -d '{"name": "World"}'
```

```ts title="SDK client"
import { createFlueClient } from '@flue/sdk';

const client = createFlueClient({ baseUrl: 'http://localhost:4321' });

const admitted = await client.workflows.invoke('hello', {
  payload: { name: 'World' },
});

const finished = await client.workflows.invoke('hello', {
  payload: { name: 'World' },
  wait: 'result',
});
```

</CodeGroup>

## Inspect runs with `flue logs`

`flue logs` is read-only. It tails or replays workflow run events from a running Flue server via the Durable Streams protocol at `/runs/:runId`.

<ParamField body="runId" type="string" required>
Opaque workflow run ID printed by `flue run` or returned in HTTP admission.
</ParamField>

<ParamField body="--server" type="url">
Base URL of the running server. Default: `http://127.0.0.1:4321`.
</ParamField>

<ParamField body="--follow / --no-follow / -f" type="boolean">
Follow live events, or replay and exit. Default: follow when run `status` is `active`, otherwise replay.
</ParamField>

<ParamField body="--since" type="offset">
Resume after an opaque Durable Streams offset (for example from ndjson `offset` fields).
</ParamField>

<ParamField body="--types" type="comma-separated list">
Filter to event types such as `log`, `tool`, `run_end`.
</ParamField>

<ParamField body="--limit" type="positive integer">
Cap the number of emitted events (client-side).
</ParamField>

<ParamField body="--format" type="pretty | ndjson">
`pretty` renders human-readable stderr lines; `ndjson` prints one JSON event per stdout line with resume `offset`.
</ParamField>

```bash
flue logs run_01H... --types log,tool,run_end --format ndjson
flue logs run_01H... --no-follow
flue logs run_01H... -f --server http://localhost:4321
```

Pretty output includes `run:start`, `run:end`, operation banners, model text deltas, tool I/O, and `log` lines from `ctx.log`. Exit code `2` when the run ends with `isError: true`; `130` on SIGINT during follow mode.

| Surface | Purpose |
| --- | --- |
| `flue logs <runId>` | CLI tail or replay |
| `GET /runs/<runId>` | Durable Streams event read (catch-up, long-poll, SSE) |
| `GET /runs/<runId>?meta` | Plain JSON `RunRecord` |
| `client.runs.get/stream/events` | Application tooling via `@flue/sdk` |
| `listRuns()` / `getRun()` from `@flue/runtime` | Server-side inspection in custom admin routes |

Run listing and metadata can expose payloads, results, and model activity. Publish listing surfaces only behind authorization appropriate for that data.

## Failure modes

| Symptom | Likely cause |
| --- | --- |
| `Unknown workflow` on `flue run` | Name typo or module missing from `workflows/`. |
| `404` on `POST /workflows/:name` | Workflow exists but lacks `route` export (no HTTP transport). |
| `flue logs` cannot fetch run | Server not running, wrong `--server`, or `runId` from a finished `flue run` child (not published to the dev server). |
| `flue run --target cloudflare` rejected | `flue run` is Node-only; use `flue dev --target cloudflare` and HTTP/SDK invocation. |
| Build error on workflow module | `run` not exported, or `route` is not callable middleware. |

## Related pages

<CardGroup>
<Card title="Flue quickstart" href="/flue-quickstart">
Scaffold a project, run `flue dev`, and invoke your first workflow with `flue run`.
</Card>
<Card title="Flue project layout" href="/flue-project-layout">
Source-root resolution, `workflows/` discovery, and `app.ts` composition.
</Card>
<Card title="Build Flue agents" href="/build-flue-agents">
Author agents with `createAgent`, tools, skills, and subagent profiles used inside workflows.
</Card>
<Card title="Runtime models" href="/runtime-models">
Workflow runs versus agent sessions and dispatch receipts.
</Card>
<Card title="Flue CLI reference" href="/flue-cli-reference">
Full `flue run` and `flue logs` flag reference and exit behavior.
</Card>
<Card title="Flue HTTP API reference" href="/flue-http-api-reference">
`/workflows`, `/runs`, admission envelopes, and stream read semantics.
</Card>
</CardGroup>
