Documentation v0.3.x
Updated Jan 23, 2026

flashQ Documentation

Complete documentation for the flashQ TypeScript SDK. High-performance job queue built with Rust, BullMQ-compatible API, 10x faster, no Redis required.

Overview

flashQ is a high-performance job queue built with Rust, designed as a drop-in replacement for BullMQ without requiring Redis.

BullMQ-compatible API

Easy migration from existing BullMQ projects

Direct TCP Protocol

With optional HTTP fallback

MessagePack Protocol

40% smaller payloads

Observability Hooks

OpenTelemetry/DataDog integration

Key Features

FeatureDescription
10MB PayloadsSupport for large AI/ML workloads
Job DependenciesChain jobs with depends_on
Priority QueuesHigher priority = processed first
Rate LimitingToken bucket algorithm per queue
Concurrency ControlLimit parallel processing
Dead Letter QueueAutomatic DLQ after max retries
Cron Jobs6-field cron expressions
Progress TrackingReal-time progress updates

Installation

SDK

npm
npm install flashq
yarn
yarn add flashq
bun
bun add flashq

Requirements

  • Node.js >= 18.0.0 or Bun
  • flashQ server running (Docker recommended)

Server (Docker)

docker run -d --name flashq \
  -p 6789:6789 \
  -p 6790:6790 \
  -e HTTP=1 \
  ghcr.io/egeominotti/flashq:latest

Architecture

┌─────────────────────────────────────────────────────────────┐ │ Your Application │ ├─────────────────────────────────────────────────────────────┤ │ Queue (BullMQ API) │ Worker (BullMQ API) │ FlashQ Client│ ├─────────────────────────────────────────────────────────────┤ │ Connection Layer │ │ TCP (Binary/JSON) │ HTTP (REST API) │ ├─────────────────────────────────────────────────────────────┤ │ flashQ Server (Rust) │ │ 32 Shards │ DashMap │ io_uring │ PostgreSQL │ └─────────────────────────────────────────────────────────────┘

Protocol Options

ProtocolUse CasePerformance
TCP + JSONDefault, debuggingGood
TCP + MessagePackProduction, high throughputBest (40% smaller)
HTTP + JSONFirewalls, load balancersGood

Connection Options

The FlashQ class provides direct access to all server operations.

import { FlashQ } from 'flashq';

const client = new FlashQ({
  host: 'localhost',
  port: 6789,
});

await client.connect();
// ... use client
await client.close();

ClientOptions

OptionTypeDefaultDescription
hoststring'localhost'Server host
portnumber6789TCP port
httpPortnumber6790HTTP port
socketPathstring-Unix socket path
tokenstring-Auth token
timeoutnumber5000Connection timeout (ms)
useHttpbooleanfalseUse HTTP instead of TCP
useBinarybooleanfalseUse MessagePack protocol
autoReconnectbooleantrueAuto-reconnect on disconnect
maxReconnectAttemptsnumber10Max reconnect attempts (0 = infinite)
reconnectDelaynumber1000Initial reconnect delay (ms)
maxReconnectDelaynumber30000Max reconnect delay (ms)
logLevelstring'silent'Log level
compressionbooleanfalseEnable gzip compression
compressionThresholdnumber1024Min payload size to compress
hooksClientHooks-Observability hooks

Core Methods

connect(): Promise<void>

Establish connection to the flashQ server.

const client = new FlashQ({ host: 'localhost', port: 6789 });
await client.connect();

close(): Promise<void>

Close the connection gracefully.

await client.close();

push<T>(queue, data, options?): Promise<Job>

Push a job to a queue.

const job = await client.push('emails', {
  to: 'user@example.com',
  subject: 'Welcome!'
}, {
  priority: 10,
  delay: 5000,
  max_attempts: 3,
});

pushBatch<T>(queue, jobs): Promise<number[]>

Push multiple jobs in a single batch.

const ids = await client.pushBatch('emails', [
  { data: { to: 'user1@example.com' } },
  { data: { to: 'user2@example.com' }, priority: 10 },
]);

pull<T>(queue, timeout?): Promise<Job | null>

Pull a job from a queue (blocking with timeout).

const job = await client.pull<EmailData>('emails', 5000);
if (job) {
  console.log('Processing:', job.data);
  await client.ack(job.id);
}

ack(jobId, result?): Promise<void>

Acknowledge a job as completed.

await client.ack(job.id, { sent: true, timestamp: Date.now() });

fail(jobId, error?): Promise<void>

Fail a job (will retry or move to DLQ).

await client.fail(job.id, 'SMTP connection timeout');

PushOptions

OptionTypeDescription
prioritynumberHigher = processed first (default: 0)
delaynumberDelay in ms before job is available
max_attemptsnumberMax retry attempts (default: 0)
backoffnumberBackoff base in ms (exponential)
timeoutnumberJob timeout in ms
ttlnumberTime-to-live in ms
unique_keystringUnique key for deduplication
jobIdstringCustom ID for idempotency
depends_onnumber[]Job IDs that must complete first
tagsstring[]Tags for filtering
lifobooleanLIFO mode (stack)
group_idstringGroup ID for FIFO within group

Job Management

getJob(jobId): Promise<JobWithState | null>

Get a job with its current state.

const result = await client.getJob(123);
if (result) {
  console.log(`Job ${result.job.id} is ${result.state}`);
}

getState(jobId): Promise<JobState | null>

Get job state only.

const state = await client.getState(123);
// Returns: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed' | null

getJobs(options): Promise<{jobs, total}>

List jobs with filtering and pagination.

const { jobs, total } = await client.getJobs({
  queue: 'emails',
  state: 'failed',
  limit: 100,
  offset: 0,
});

progress(jobId, progress, message?): Promise<void>

Update job progress (0-100).

await client.progress(job.id, 50, 'Processing items...');
await client.progress(job.id, 100, 'Complete');

finished(jobId, timeout?): Promise<unknown>

Wait for job completion and return result.

const job = await client.push('processing', data);
const result = await client.finished(job.id, 30000); // 30s timeout

cancel(jobId): Promise<void>

Cancel a pending job.

await client.cancel(123);

changePriority(jobId, priority): Promise<void>

Change job priority at runtime.

await client.changePriority(job.id, 100); // Boost priority

log(jobId, message, level?): Promise<void>

Add log entry to job.

await client.log(job.id, 'Starting processing', 'info');
await client.log(job.id, 'Warning: rate limit approaching', 'warn');

Queue Management

pause(queue): Promise<void>

Pause a queue (stops processing).

await client.pause('emails');

resume(queue): Promise<void>

Resume a paused queue.

await client.resume('emails');

drain(queue): Promise<number>

Remove all waiting jobs from queue.

const removed = await client.drain('emails');
console.log(`Removed ${removed} jobs`);

obliterate(queue): Promise<void>

Remove ALL queue data (jobs, DLQ, cron, state).

await client.obliterate('test-queue'); // Dangerous!

listQueues(): Promise<QueueInfo[]>

List all queues with stats.

const queues = await client.listQueues();
for (const q of queues) {
  console.log(`${q.name}: ${q.pending} pending, ${q.processing} active`);
}

getJobCounts(queue): Promise<Record<JobState, number>>

Get job counts grouped by state.

const counts = await client.getJobCounts('emails');
// { waiting: 10, delayed: 5, active: 2, completed: 100, failed: 3 }

Dead Letter Queue

getDlq(queue, count?): Promise<Job[]>

Get jobs from dead letter queue.

const dlqJobs = await client.getDlq('emails', 100);

retryDlq(queue, jobId?): Promise<number>

Retry DLQ jobs. If jobId is provided, retries only that job.

// Retry all DLQ jobs
const retried = await client.retryDlq('emails');

// Retry specific job
await client.retryDlq('emails', 123);

purgeDlq(queue): Promise<number>

Remove all jobs from DLQ.

const purged = await client.purgeDlq('emails');

Rate Limiting

setRateLimit(queue, limit): Promise<void>

Set queue rate limit (jobs per second).

await client.setRateLimit('api-calls', 100); // 100 jobs/sec

clearRateLimit(queue): Promise<void>

Clear rate limit.

await client.clearRateLimit('api-calls');

setConcurrency(queue, limit): Promise<void>

Set concurrency limit for queue.

await client.setConcurrency('heavy-processing', 5);

Cron Jobs

addCron(name, options): Promise<void>

Add a cron job with schedule or repeat interval.

// Using cron expression (sec min hour day month weekday)
await client.addCron('daily-report', {
  queue: 'reports',
  data: { type: 'daily' },
  schedule: '0 0 9 * * *', // Every day at 9:00 AM
  priority: 10,
});

// Using repeat interval
await client.addCron('health-check', {
  queue: 'monitoring',
  data: { check: 'health' },
  repeat_every: 60000, // Every minute
  limit: 1000, // Max 1000 executions
});

deleteCron(name): Promise<boolean>

Delete a cron job.

const deleted = await client.deleteCron('daily-report');

listCrons(): Promise<CronJob[]>

List all cron jobs.

const crons = await client.listCrons();
for (const cron of crons) {
  console.log(`${cron.name}: next run at ${new Date(cron.next_run)}`);
}

Flows (Job Dependencies)

pushFlow<T>(queue, parentData, children, options?): Promise<FlowResult>

Create a workflow with parent and children jobs.

const flow = await client.pushFlow(
  'processing',
  { type: 'aggregate' },  // Parent job data
  [
    { queue: 'step1', data: { task: 'fetch' } },
    { queue: 'step2', data: { task: 'transform' } },
    { queue: 'step3', data: { task: 'load' } },
  ],
  { priority: 10 }  // FlowOptions
);

// Wait for parent (completes when all children complete)
const result = await client.finished(flow.parent_id);

Metrics

stats(): Promise<QueueStats>

Get queue statistics.

const stats = await client.stats();
console.log(`Queued: ${stats.queued}, Processing: ${stats.processing}`);

metrics(): Promise<Metrics>

Get detailed metrics.

const metrics = await client.metrics();
console.log(`Throughput: ${metrics.jobs_per_second} jobs/sec`);
console.log(`Avg Latency: ${metrics.avg_latency_ms}ms`);

Queue Class

The Queue class provides a BullMQ-compatible API for job management.

import { Queue } from 'flashq';

const queue = new Queue('emails', {
  host: 'localhost',
  port: 6789,
});

add(name, data, opts?): Promise<Job>

Add a job to the queue.

const job = await queue.add('send-email', {
  to: 'user@example.com',
  subject: 'Hello',
}, {
  priority: 10,
  delay: 5000,
  attempts: 3,
  backoff: { type: 'exponential', delay: 1000 },
});

addBulk(jobs): Promise<Job[]>

Add multiple jobs.

const jobs = await queue.addBulk([
  { name: 'send', data: { to: 'a@test.com' } },
  { name: 'send', data: { to: 'b@test.com' }, opts: { priority: 10 } },
]);

finished(jobId, timeout?): Promise<R | null>

Wait for job completion and return its result.

const job = await queue.add('process', data);
const result = await queue.finished(job.id, 30000);

getJobCounts(): Promise<JobCounts>

Get job counts by state.

const counts = await queue.getJobCounts();
// { waiting: 10, active: 2, completed: 100, failed: 5, delayed: 3 }

pause() / resume() / isPaused()

Control queue processing state.

await queue.pause();
if (await queue.isPaused()) {
  console.log('Queue is paused');
}
await queue.resume();

drain() / obliterate() / clean()

Clean up queue data.

await queue.drain(); // Remove waiting jobs
await queue.obliterate(); // Remove ALL data
const cleaned = await queue.clean(3600000, 1000, 'completed'); // Remove old completed

Worker Class

The Worker class processes jobs from queues.

import { Worker } from 'flashq';

const worker = new Worker('emails', async (job) => {
  console.log('Processing:', job.data);
  return { sent: true };
}, {
  concurrency: 10,
  autorun: true,
});

WorkerOptions

OptionTypeDefaultDescription
idstringautoWorker ID
concurrencynumber10Parallel job processing
batchSizenumber100Jobs to pull per batch
autoAckbooleantrueAuto-acknowledge on success
autorunbooleantrueStart processing immediately
closeTimeoutnumber30000Graceful shutdown timeout (ms)

Worker Methods

start() / stop(force?) / close(force?)

Control worker lifecycle.

const worker = new Worker('queue', processor, { autorun: false });
await worker.start();
// ... later
await worker.close(); // Graceful shutdown
await worker.close(true); // Force stop

updateProgress(jobId, progress, message?)

Update job progress during processing.

const worker = new Worker('queue', async (job) => {
  await worker.updateProgress(job.id, 0, 'Starting...');
  await worker.updateProgress(job.id, 50, 'Halfway done');
  await worker.updateProgress(job.id, 100, 'Complete');
  return result;
});

getState() / isRunning() / getProcessingCount()

Check worker status.

const state = worker.getState(); // 'idle' | 'starting' | 'running' | 'stopping' | 'stopped'
if (worker.isRunning()) {
  console.log(`Processing ${worker.getProcessingCount()} jobs`);
}

Worker Events

worker.on('ready', () => console.log('Worker ready'));

worker.on('active', (job, workerId) => {
  console.log(`Job ${job.id} started by ${workerId}`);
});

worker.on('completed', (job, result, workerId) => {
  console.log(`Job ${job.id} completed:`, result);
});

worker.on('failed', (job, error, workerId) => {
  console.log(`Job ${job.id} failed:`, error.message);
});

worker.on('progress', (job, progress, message) => {
  console.log(`Job ${job.id}: ${progress}% - ${message}`);
});

worker.on('stopping', () => console.log('Stopping...'));
worker.on('stopped', () => console.log('Stopped'));
worker.on('drained', () => console.log('All jobs completed'));
worker.on('error', (error) => console.error('Error:', error));

Real-Time Events

Subscribe to job events via SSE or WebSocket.

import { FlashQ, EventSubscriber } from 'flashq';

const client = new FlashQ({ host: 'localhost' });
await client.connect();

// Subscribe via SSE
const subscriber = client.subscribe('emails');
// Or via WebSocket
const wsSubscriber = client.subscribeWs('emails');

await subscriber.connect();

Event Types

EventDescription
pushedJob was added to queue
completedJob completed successfully
failedJob failed (moved to DLQ or retry)
progressJob progress was updated
timeoutJob timed out
subscriber.on('connected', () => console.log('Connected'));
subscriber.on('disconnected', () => console.log('Disconnected'));
subscriber.on('reconnecting', (attempt) => console.log(`Reconnecting ${attempt}...`));

subscriber.on('event', (event) => {
  console.log(`${event.eventType}: Job ${event.jobId}`);
});

subscriber.on('completed', (event) => {
  console.log(`Job ${event.jobId} completed`);
});

subscriber.on('failed', (event) => {
  console.log(`Job ${event.jobId} failed: ${event.error}`);
});

// Cleanup
subscriber.close();

Error Handling

flashQ provides typed error classes for precise error handling.

Error Classes

ClassCodeRetryableDescription
ConnectionErrorCONNECTION_*YesConnection failures
TimeoutErrorREQUEST_TIMEOUTYesRequest timeouts
AuthenticationErrorAUTH_FAILEDNoAuth failures
ValidationErrorVALIDATION_ERRORNoInvalid input
JobNotFoundErrorJOB_NOT_FOUNDNoJob doesn't exist
RateLimitErrorRATE_LIMITEDYesRate limit exceeded
QueuePausedErrorQUEUE_PAUSEDYesQueue is paused
import {
  FlashQError, ConnectionError, TimeoutError,
  ValidationError, RateLimitError, JobNotFoundError,
} from 'flashq';

try {
  await client.push('queue', data);
} catch (error) {
  if (error instanceof ConnectionError) {
    console.log('Connection failed:', error.message);
    // Safe to retry
  } else if (error instanceof TimeoutError) {
    console.log(`Timeout after ${error.timeoutMs}ms`);
    // Safe to retry
  } else if (error instanceof RateLimitError) {
    console.log(`Rate limited, retry after ${error.retryAfterMs}ms`);
    // Wait and retry
  } else if (error instanceof ValidationError) {
    console.log(`Invalid ${error.field}: ${error.message}`);
    // Fix input, don't retry
  } else if (error instanceof FlashQError) {
    if (error.retryable) {
      // Safe to retry
    }
  }
}

Retry Logic

Built-in retry utilities with exponential backoff.

withRetry(fn, options)

Wrap a single operation with retry logic.

import { withRetry } from 'flashq';

const result = await withRetry(
  () => client.push('queue', data),
  {
    maxRetries: 3,
    initialDelay: 100,
    maxDelay: 5000,
    backoffMultiplier: 2,
    jitter: true,
    onRetry: (error, attempt, delay) => {
      console.log(`Retry ${attempt} after ${delay}ms: ${error.message}`);
    },
  }
);

retryable(fn, options)

Create a retryable version of a function.

import { retryable } from 'flashq';

const retryablePush = retryable(
  (queue: string, data: unknown) => client.push(queue, data),
  { maxRetries: 3 }
);

await retryablePush('emails', { to: 'user@example.com' });

RetryPresets

Pre-configured retry strategies.

import { RetryPresets } from 'flashq';

RetryPresets.fast       // 2 retries, 50ms initial, 500ms max
RetryPresets.standard   // 3 retries, 100ms initial, 5s max
RetryPresets.aggressive // 5 retries, 200ms initial, 30s max
RetryPresets.none       // 0 retries

Observability Hooks

Integrate with OpenTelemetry, DataDog, or custom metrics.

import { FlashQ, ClientHooks } from 'flashq';

const hooks: ClientHooks = {
  onPush: (ctx) => console.log(`Pushing to ${ctx.queue}`),
  onPushComplete: (ctx) => {
    console.log(`Pushed job ${ctx.job?.id} in ${Date.now() - ctx.startTime}ms`);
  },
  onPushError: (ctx, error) => console.error(`Push failed: ${error.message}`),

  onPull: (ctx) => console.log(`Pulling from ${ctx.queue}`),
  onPullComplete: (ctx) => console.log(`Pulled job ${ctx.job?.id}`),

  onConnection: (ctx) => console.log(`Connection event: ${ctx.event}`),
};

const client = new FlashQ({ hooks });

OpenTelemetry Example

import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('flashq');

const hooks: ClientHooks = {
  onPush: (ctx) => {
    ctx.span = tracer.startSpan('flashq.push', {
      attributes: { 'flashq.queue': ctx.queue },
    });
  },
  onPushComplete: (ctx) => {
    ctx.span?.setAttribute('flashq.job_id', ctx.job?.id);
    ctx.span?.end();
  },
  onPushError: (ctx, error) => {
    ctx.span?.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
    ctx.span?.recordException(error);
    ctx.span?.end();
  },
};

Logger

Configurable logging with request ID tracking.

import { createLogger, setGlobalLogger } from 'flashq';

const logger = createLogger({
  level: 'info',  // trace | debug | info | warn | error | silent
  prefix: 'my-app',
  timestamps: true,
});

logger.info('Processing request', { userId: 123 });
// Output: 2024-01-15T10:30:00.000Z [my-app] INFO Processing request { userId: 123 }

// Request ID tracking
logger.setRequestId('req-12345');
logger.info('Processing'); // Includes [req-12345] in output

// Child loggers
const dbLogger = logger.child('db');
dbLogger.info('Query executed');
// Output: [my-app:db] INFO Query executed

// Set as global
setGlobalLogger(logger);

Types Reference

Job<T>

interface Job<T = unknown> {
  id: number;
  queue: string;
  data: T;
  priority: number;
  attempts: number;
  created_at: number;
  run_at: number;
  max_attempts: number;
  backoff: number;
  ttl: number;
  timeout: number;
  progress: number;
  unique_key?: string;
  custom_id?: string;
  tags: string[];
  depends_on: number[];
  parent_id?: number;
  children_ids: number[];
  group_id?: string;
}

JobState

type JobState = 'waiting' | 'delayed' | 'active' | 'completed' | 'failed';

QueueInfo / QueueStats / Metrics

interface QueueInfo {
  name: string;
  pending: number;
  processing: number;
  dlq: number;
  paused: boolean;
}

interface QueueStats {
  queued: number;
  processing: number;
  delayed: number;
  dlq: number;
}

interface Metrics {
  total_pushed: number;
  total_completed: number;
  total_failed: number;
  jobs_per_second: number;
  avg_latency_ms: number;
}

CronJob / CronOptions

interface CronJob {
  name: string;
  queue: string;
  data: unknown;
  schedule?: string;
  repeat_every?: number;
  priority: number;
  next_run: number;
  executions: number;
  limit?: number;
}

interface CronOptions {
  queue: string;
  data: unknown;
  schedule?: string;      // "sec min hour day month weekday"
  repeat_every?: number;  // Or repeat every N ms
  priority?: number;
  limit?: number;
}

FlowChild / FlowResult

interface FlowChild {
  queue: string;
  data: unknown;
  priority?: number;
  delay?: number;
}

interface FlowResult {
  parent_id: number;
  children_ids: number[];
}

Constants

import { MAX_BATCH_SIZE, MAX_JOB_DATA_SIZE } from 'flashq';

MAX_BATCH_SIZE     // 1000 - Maximum jobs per batch
MAX_JOB_DATA_SIZE  // 10MB - Maximum job data size

Best Practices

1. Use Binary Protocol in Production

const client = new FlashQ({
  useBinary: true, // 40% smaller payloads
});

2. Enable Graceful Shutdown

process.on('SIGTERM', async () => {
  await worker.close(); // Waits for current jobs
  await client.close();
  process.exit(0);
});

3. Use Batch Operations

// Instead of many single pushes
for (const data of items) {
  await client.push('queue', data); // Slow
}

// Use batch
await client.pushBatch('queue', items.map(data => ({ data }))); // Fast

4. Handle Errors Properly

try {
  await client.push('queue', data);
} catch (error) {
  if (error instanceof FlashQError && error.retryable) {
    await withRetry(() => client.push('queue', data));
  } else {
    logger.error('Failed to push job', error);
  }
}

5. Use Job Dependencies for Workflows

const step1 = await client.push('step1', data1);
const step2 = await client.push('step2', data2, { depends_on: [step1.id] });
const step3 = await client.push('step3', data3, { depends_on: [step2.id] });
const result = await client.finished(step3.id);

6. Set Appropriate Concurrency

// For I/O-bound work (API calls, DB)
const worker = new Worker('api-queue', processor, { concurrency: 50 });

// For CPU-bound work
const worker = new Worker('cpu-queue', processor, { concurrency: os.cpus().length });

7. Use Rate Limiting for External APIs

await client.setRateLimit('openai-calls', 50); // 50 req/sec

const worker = new Worker('openai-calls', async (job) => {
  // Rate limiting is enforced server-side
  return await openai.chat.completions.create(job.data);
});

8. Monitor with Hooks

const client = new FlashQ({
  hooks: {
    onPushComplete: (ctx) => {
      metrics.pushLatency.observe(Date.now() - ctx.startTime);
      metrics.pushCounter.inc({ queue: ctx.queue });
    },
    onPushError: (ctx, error) => {
      metrics.errorCounter.inc({ queue: ctx.queue, code: error.code });
    },
  },
});

flashQ Documentation - flashq.dev