# Workspace Runtime, Jobs & Instance Commands

> BullMQ background worker, queue-worker bootstrapping, database migration commands (RegisteredInstanceCommand / RegisteredWorkspaceCommand), upgrade pattern (fast vs slow instance commands), and the workspace event emitter that drives real-time side effects.

- Repository: twentyhq/twenty
- GitHub: https://github.com/twentyhq/twenty
- Human wiki: https://grok-wiki.com/public/wiki/twentyhq-twenty-7ed82e5a21f6
- Complete Markdown: https://grok-wiki.com/public/wiki/twentyhq-twenty-7ed82e5a21f6/llms-full.txt

## Source Files

- `packages/twenty-server/src/queue-worker`
- `packages/twenty-server/src/engine/workspace-event-emitter`
- `packages/twenty-server/src/command`
- `packages/twenty-server/src/database`
- `packages/twenty-server/docs/UPGRADE_COMMANDS.md`
- `packages/twenty-server/src/modules/modules.module.ts`

---

<details>
<summary>Relevant source files</summary>
The following files were used as context for generating this wiki page:

- [packages/twenty-server/src/queue-worker/queue-worker.ts](packages/twenty-server/src/queue-worker/queue-worker.ts)
- [packages/twenty-server/src/queue-worker/queue-worker.module.ts](packages/twenty-server/src/queue-worker/queue-worker.module.ts)
- [packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts](packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts)
- [packages/twenty-server/src/engine/core-modules/message-queue/message-queue.module.ts](packages/twenty-server/src/engine/core-modules/message-queue/message-queue.module.ts)
- [packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts](packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts)
- [packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts](packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts)
- [packages/twenty-server/src/engine/core-modules/upgrade/decorators/registered-instance-command.decorator.ts](packages/twenty-server/src/engine/core-modules/upgrade/decorators/registered-instance-command.decorator.ts)
- [packages/twenty-server/src/engine/core-modules/upgrade/decorators/registered-workspace-command.decorator.ts](packages/twenty-server/src/engine/core-modules/upgrade/decorators/registered-workspace-command.decorator.ts)
- [packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-command-registry.service.ts](packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-command-registry.service.ts)
- [packages/twenty-server/src/database/commands/command-runners/workspace.command-runner.ts](packages/twenty-server/src/database/commands/command-runners/workspace.command-runner.ts)
- [packages/twenty-server/src/command/command.ts](packages/twenty-server/src/command/command.ts)
- [packages/twenty-server/docs/UPGRADE_COMMANDS.md](packages/twenty-server/docs/UPGRADE_COMMANDS.md)
</details>

# Workspace Runtime, Jobs & Instance Commands

Twenty's backend runs as two distinct processes: the main API server and a dedicated **queue worker**. The queue worker processes background jobs asynchronously using BullMQ and Redis, decoupling long-running tasks from request handling. Database schema evolution is handled by a separate command subsystem — **instance commands** and **workspace commands** — that replaces raw TypeORM migrations with a versioned, decorator-driven upgrade pipeline. A global `WorkspaceEventEmitter` bridges ORM mutations to downstream side effects such as webhooks, automation triggers, and audit records.

This page documents how the queue worker bootstraps, how the BullMQ driver manages queues and workers, how instance and workspace commands are declared and ordered, and how the workspace event emitter propagates database change events.

---

## Queue Worker Process

### Bootstrapping

The worker process is a standalone Node.js entry point that creates a headless NestJS application context — no HTTP server is started.

```ts
// packages/twenty-server/src/queue-worker/queue-worker.ts
async function bootstrap() {
  const app = await NestFactory.createApplicationContext(QueueWorkerModule, {
    bufferLogs: process.env.LOGGER_IS_BUFFER_ENABLED === 'true',
  });
  loggerService = app.get(LoggerService);
  app.useLogger(loggerService ?? false);
}
```

Sources: [packages/twenty-server/src/queue-worker/queue-worker.ts:9-33](packages/twenty-server/src/queue-worker/queue-worker.ts)

The root `QueueWorkerModule` wires the minimum set of modules required for job processing:

```ts
// packages/twenty-server/src/queue-worker/queue-worker.module.ts
@Module({
  imports: [
    CoreEngineModule,
    MessageQueueModule.registerExplorer(),  // registers BullMQ workers via discovery
    WorkspaceEventEmitterModule,
    JobsModule,
    TwentyORMModule,
    GlobalWorkspaceDataSourceModule,
  ],
})
export class QueueWorkerModule {}
```

Sources: [packages/twenty-server/src/queue-worker/queue-worker.module.ts:1-19](packages/twenty-server/src/queue-worker/queue-worker.module.ts)

`MessageQueueModule.registerExplorer()` activates the `MessageQueueExplorer` and `MessageQueueMetadataAccessor` via NestJS `DiscoveryModule`, which discovers all `@Processor`-decorated classes at startup and registers them as BullMQ workers.

Sources: [packages/twenty-server/src/engine/core-modules/message-queue/message-queue.module.ts:22-27](packages/twenty-server/src/engine/core-modules/message-queue/message-queue.module.ts)

### BullMQ Driver

The `BullMQDriver` class wraps the BullMQ `Queue` and `Worker` primitives. It maintains internal maps keyed by queue name (`MessageQueue` enum values):

```ts
private queueMap: Record<MessageQueue, Queue> = {} as Record<MessageQueue, Queue>;
private workerMap: Record<MessageQueue, Worker> = {} as Record<MessageQueue, Worker>;
```

**Queue registration** creates a `Queue` instance for each logical queue:

```ts
register(queueName: MessageQueue): void {
  this.queueMap[queueName] = new Queue(queueName, this.options);
}
```

**Worker registration** creates a BullMQ `Worker` that processes jobs with Sentry isolation scoping, structured logging, and execution time measurement:

```ts
work<T>(queueName, handler, options?) {
  this.workerMap[queueName] = new Worker(queueName, async (job) =>
    Sentry.withIsolationScope(async () => {
      await handler({ data: job.data, id: job.id ?? '', name: job.name });
    }),
    workerOptions,
  );
}
```

Sources: [packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts:81-161](packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts)

**Module lifecycle:** `onModuleInit` registers an OpenTelemetry observable gauge that reports the total number of waiting jobs across all queues. `onModuleDestroy` closes all queues and workers gracefully.

Sources: [packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts:56-93](packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts)

### Job Addition

The driver's `add` method accepts a job ID deduplication hint. When `options.id` is set, it scans the waiting list to prevent enqueueing a duplicate. The actual BullMQ job ID appends a UUID to allow a second waiting job once the first is being processed.

```ts
// Ensures only one waiting job exists for a given option.id
if (options?.id) {
  const waitingJobs = await this.queueMap[queueName].getJobs(['waiting']);
  const isJobAlreadyWaiting = waitingJobs.some(
    (job) => job.id?.slice(0, -(V4_LENGTH + 1)) === options.id,
  );
  if (isJobAlreadyWaiting) return;
}
```

Sources: [packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts:220-260](packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts)

### Retention Policy

Completed and failed jobs are automatically pruned. Retention is controlled by constants in `queue-retention.constants.ts` (age and count limits), applied uniformly to every `add` and `addCron` call.

---

## Instance and Workspace Upgrade Commands

Twenty's upgrade pipeline replaces raw TypeORM migrations with a structured, decorator-based system that distinguishes between instance-level schema changes and per-workspace data migrations.

### Command Types

| Type | Interface | Decorator | Scope | When It Runs |
|---|---|---|---|---|
| Fast instance command | `FastInstanceCommand` | `@RegisteredInstanceCommand(version, ts)` | Core DB schema | Immediately on upgrade |
| Slow instance command | `SlowInstanceCommand` | `@RegisteredInstanceCommand(version, ts, { type: 'slow' })` | Core DB schema + data | With `--include-slow` flag |
| Workspace command | `WorkspaceCommandRunner` | `@RegisteredWorkspaceCommand(version, ts)` | Per-workspace schemas | After instance commands |

### Instance Commands

Instance commands operate on the shared `core` PostgreSQL schema and replace individual TypeORM migration files.

**Fast commands** run immediately during upgrade and implement `up` and `down` methods via a `QueryRunner`:

```ts
@RegisteredInstanceCommand('1.22.0', 1775758621017)
export class AddWorkspaceIdToTotoFastInstanceCommand implements FastInstanceCommand {
  public async up(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.query(`ALTER TABLE "core"."toto" ADD "workspaceId" uuid`);
  }
  public async down(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.query(`ALTER TABLE "core"."toto" DROP COLUMN "workspaceId"`);
  }
}
```

**Slow commands** extend the fast interface by adding a `runDataMigration(dataSource)` step that executes before `up`. This handles bulk backfills that could be long-running:

```ts
@RegisteredInstanceCommand('1.22.0', 1775758621018, { type: 'slow' })
export class BackfillWorkspaceIdSlowInstanceCommand implements SlowInstanceCommand {
  async runDataMigration(dataSource: DataSource): Promise<void> { /* backfill rows */ }
  public async up(queryRunner: QueryRunner): Promise<void> { /* set NOT NULL */ }
  public async down(queryRunner: QueryRunner): Promise<void> { /* drop NOT NULL */ }
}
```

Sources: [packages/twenty-server/docs/UPGRADE_COMMANDS.md:22-74](packages/twenty-server/docs/UPGRADE_COMMANDS.md)

A common pairing pattern: a **fast** command adds a nullable column; a **slow** command backfills rows and then enforces `NOT NULL`.

### The `@RegisteredInstanceCommand` Decorator

The decorator is a function returning a `ClassDecorator`. It applies `@Injectable()` and stores version, timestamp, and type metadata via `Reflect.defineMetadata`:

```ts
export const RegisteredInstanceCommand =
  (version: TwentyAllVersion, timestamp: number, options?: { type: 'slow' }): ClassDecorator =>
  (target) => {
    Injectable()(target);
    Reflect.defineMetadata(
      REGISTERED_INSTANCE_COMMAND_KEY,
      { version, timestamp, type: options?.type ?? 'fast' },
      target,
    );
  };
```

Sources: [packages/twenty-server/src/engine/core-modules/upgrade/decorators/registered-instance-command.decorator.ts:21-34](packages/twenty-server/src/engine/core-modules/upgrade/decorators/registered-instance-command.decorator.ts)

### Workspace Commands

Workspace commands iterate over all active or suspended workspaces and apply per-workspace mutations. They combine the `@RegisteredWorkspaceCommand` metadata decorator with nest-commander's `@Command` decorator and extend `ActiveOrSuspendedWorkspaceCommandRunner`:

```ts
@RegisteredWorkspaceCommand('1.22.0', 1780000002000)
@Command({ name: 'upgrade:1-22:backfill-standard-skills', ... })
export class BackfillStandardSkillsCommand extends ActiveOrSuspendedWorkspaceCommandRunner {
  override async runOnWorkspace({ workspaceId, options }: RunOnWorkspaceArgs): Promise<void> {
    // per-workspace logic; options.dryRun and options.verbose are available
  }
}
```

Sources: [packages/twenty-server/docs/UPGRADE_COMMANDS.md:78-117](packages/twenty-server/docs/UPGRADE_COMMANDS.md)

The `WorkspaceCommandRunner` base class iterates workspaces through `WorkspaceIteratorService.iterate()`, exposing standard CLI flags automatically:

| Flag | Description |
|---|---|
| `-d, --dry-run` | Simulate without persisting changes |
| `-v, --verbose` | Enable verbose logging |
| `--workspace-id` | Limit execution to specified workspace IDs (repeatable) |
| `--start-from-workspace-id` | Resume from a specific workspace (ascending order) |
| `--workspace-count-limit` | Cap total workspaces processed |

Sources: [packages/twenty-server/src/database/commands/command-runners/workspace.command-runner.ts:41-137](packages/twenty-server/src/database/commands/command-runners/workspace.command-runner.ts)

### Command Registry and Execution Order

The `UpgradeCommandRegistryService` collects all decorated commands via their metadata, groups them by version into `VersionBundle` objects, and validates that no timestamp collision exists within the same kind:

```ts
type VersionBundle = {
  fastInstanceCommands: RegisteredFastInstanceCommand[];
  slowInstanceCommands: RegisteredSlowInstanceCommand[];
  workspaceCommands: RegisteredWorkspaceCommand[];
};
```

Within each version, the upgrade pipeline runs commands in this strict order, sorted by timestamp within each group:

1. **Fast instance commands** — immediate schema changes
2. **Slow instance commands** — data migrations + schema enforcement (requires `--include-slow`)
3. **Workspace commands** — per-workspace changes, applied sequentially

Sources: [packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-command-registry.service.ts:24-50](packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-command-registry.service.ts), [packages/twenty-server/docs/UPGRADE_COMMANDS.md:109-118](packages/twenty-server/docs/UPGRADE_COMMANDS.md)

### Generating Instance Commands

A dedicated CLI command scaffolds the boilerplate file and registers it automatically:

```bash
npx nx run twenty-server:database:migrate:generate --name <name> --type <fast|slow>
```

This writes a timestamped file and updates `instance-commands.constant.ts`. **Do not edit that constant file manually.**

---

## Workspace Event Emitter

The `WorkspaceEventEmitter` is a global NestJS service that translates ORM-level mutations into typed event batches, propagated through `EventEmitter2`.

```ts
@Global()
@Module({
  providers: [WorkspaceEventEmitter],
  exports: [WorkspaceEventEmitter],
})
export class WorkspaceEventEmitterModule {}
```

Sources: [packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts:1-10](packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.module.ts)

### API

**`emitDatabaseBatchEvent`** — emits a typed batch of record lifecycle events (created, updated, deleted, destroyed, restored, upserted). The event name is computed from the object metadata name and action:

```ts
public emitDatabaseBatchEvent<T, A extends keyof ActionEventMap<T>>(
  databaseBatchEventInput: DatabaseBatchEventInput<T, A> | undefined,
) {
  const eventName = computeEventName(objectMetadataNameSingular, action);
  const workspaceEventBatch: WorkspaceEventBatch<ActionEventMap<T>[A]> = {
    name: eventName, workspaceId, objectMetadata, events,
  };
  this.eventEmitter.emit(eventName, workspaceEventBatch);
}
```

**`emitCustomBatchEvent`** — emits an arbitrary named batch event useful for domain-level signals (e.g., email received, calendar sync complete) that do not correspond to a direct ORM entity mutation.

Sources: [packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts:42-89](packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts)

### Integration with TwentyORM

The workspace entity manager holds a reference to `WorkspaceEventEmitter` through the connection context and calls `emitDatabaseBatchEvent` at the end of mutating operations (save, remove, soft-remove, etc.), ensuring that downstream subscribers — webhooks, automation jobs, audit logs — are notified after every persisted change.

Sources: [packages/twenty-server/src/engine/twenty-orm/entity-manager/workspace-entity-manager.ts:86-91](packages/twenty-server/src/engine/twenty-orm/entity-manager/workspace-entity-manager.ts)

### Event Name Pattern

```
<objectNameSingular>.<action>
// e.g.:  person.created  |  company.updated  |  note.deleted
```

Event subscribers declared with NestJS `@OnEvent('person.created')` (or a wildcard `*.created`) receive a `WorkspaceEventBatch` containing the `workspaceId`, object metadata, and the list of record event payloads.

---

## Runtime Architecture Overview

```text
┌──────────────────────────────────────┐  ┌────────────────────────────────────────┐
│         API Server Process           │  │       Queue Worker Process             │
│  (NestJS HTTP + GraphQL)             │  │  (NestJS ApplicationContext, no HTTP)  │
│                                      │  │                                        │
│  MessageQueueService.add() ──────────┼──► BullMQDriver.add()                    │
│  WorkspaceEventEmitter               │  │  BullMQDriver.work()                   │
│                                      │  │    └─ Worker per MessageQueue enum     │
│  CommandModule (nest-commander)      │  │  JobsModule (all job handlers)         │
└──────────────────────────────────────┘  └────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────┐
│       Upgrade Command Pipeline  (npx nx run ...:database:...)    │
│                                                                  │
│  CommandModule (nest-commander)                                  │
│    └─ UpgradeCommandRegistryService                             │
│         ├─ Fast instance commands  (immediate schema DDL)        │
│         ├─ Slow instance commands  (data migration + DDL)        │
│         └─ Workspace commands      (per-workspace iteration)     │
└──────────────────────────────────────────────────────────────────┘
```

---

## Summary

Twenty's worker runtime separates concerns cleanly: the queue-worker process is a minimal NestJS context that activates BullMQ workers via module discovery, with no HTTP surface. Instance commands provide a versioned, rollback-aware alternative to raw TypeORM migrations, split between fast DDL-only changes and slow data-migration-plus-DDL pairs. Workspace commands apply per-workspace mutations across all active workspaces in a controlled, optionally resumable iteration. The global `WorkspaceEventEmitter` ties all ORM mutations to downstream event subscribers through typed batch events, enabling webhooks, automation, and observability without tight coupling to the data layer.
