flashQ Documentation

flashQ is a high-performance job queue built with Rust. It provides a BullMQ-compatible API without requiring Redis, making it perfect for AI workloads, LLM pipelines, and high-throughput applications.

Why flashQ?

Feature flashQ BullMQ + Redis
External dependencies None Redis server required
Throughput 300K jobs/sec ~30K jobs/sec
Max payload 10 MB ~5 MB recommended
API compatibility Same BullMQ-style API

Installation

Server

Start the flashQ server using Docker (recommended) or download the binary.

Docker (Recommended)

Terminal
# Pull multi-arch image (amd64 + arm64)
docker pull ghcr.io/egeominotti/flashq:latest

# Run with dashboard enabled
docker run -d --name flashq \
  -p 6789:6789 \
  -p 6790:6790 \
  -e HTTP=1 \
  ghcr.io/egeominotti/flashq:latest

Binary

Terminal
# Linux x86_64
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-linux-x86_64.tar.gz | tar xz
./flashq-server

# macOS Apple Silicon
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-macos-arm64.tar.gz | tar xz
./flashq-server

SDK

Install the TypeScript SDK in your project:

Terminal
# Using bun (recommended)
bun add flashq

# Using npm
npm install flashq

# Using yarn
yarn add flashq
💡 TypeScript Support

flashQ includes built-in TypeScript definitions. No additional @types package needed.

Quick Start

Get your first job queue running in under 5 minutes.

Start the server

docker run -d -p 6789:6789 ghcr.io/egeominotti/flashq:latest

Install the SDK

bun add flashq

Create a queue and add jobs

producer.ts
import { Queue } from 'flashq';

const queue = new Queue('emails');

// Add a job
await queue.add('send-welcome', {
  to: 'user@example.com',
  subject: 'Welcome!'
});

console.log('Job added!');

Process jobs with a worker

worker.ts
import { Worker } from 'flashq';

const worker = new Worker('emails', async (job) => {
  console.log(`Sending email to ${job.data.to}`);

  // Your email sending logic here
  await sendEmail(job.data);

  return { sent: true };
});

worker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed`);
});

worker.on('failed', (job, error) => {
  console.error(`Job ${job.id} failed: ${error.message}`);
});
✅ That's it!

Your job queue is now running. The worker will automatically process jobs as they're added to the queue.

Core Concepts

Queues

A Queue is a named container for jobs. Jobs in a queue are processed in priority order (highest first), with FIFO ordering for jobs of the same priority.

const emailQueue = new Queue('emails');
const reportQueue = new Queue('reports');

Jobs

A Job is a unit of work with a name, data payload, and optional configuration. Jobs progress through states: waitingactivecompleted or failed.

const job = await queue.add('process-image', {
  imageUrl: 'https://example.com/image.jpg',
  filters: ['resize', 'compress']
}, {
  priority: 10,
  attempts: 3
});

Workers

A Worker processes jobs from a queue. Workers can run concurrently and automatically handle job acknowledgment, retries, and error handling.

const worker = new Worker('emails', processor, {
  concurrency: 10  // Process 10 jobs in parallel
});

Job States

State Description
waiting Job is queued and ready to be processed
delayed Job is scheduled to run at a future time
active Job is currently being processed by a worker
completed Job finished successfully
failed Job failed and exhausted all retry attempts (in DLQ)
waiting-children Job is waiting for dependent jobs to complete

Queue API

The Queue class provides methods for adding jobs and managing queue state.

Constructor

const queue = new Queue(name, options?);
Option Type Default Description
host string 'localhost' Server hostname
port number 6789 Server port
token string - Authentication token

Methods

add(name, data, opts?)

Add a single job to the queue.

const job = await queue.add('send-email', {
  to: 'user@example.com'
}, {
  priority: 10,
  delay: 5000,
  attempts: 3
});

console.log(job.id); // Unique job ID

addBulk(jobs)

Add multiple jobs in a single batch operation. More efficient than calling add() multiple times.

const jobs = await queue.addBulk([
  { name: 'send', data: { to: 'a@test.com' } },
  { name: 'send', data: { to: 'b@test.com' }, opts: { priority: 10 } },
  { name: 'send', data: { to: 'c@test.com' }, opts: { delay: 5000 } },
]);

getJob(jobId)

Get a job by its ID, including current state and data.

const job = await queue.getJob(123);
console.log(job.state);  // 'completed'
console.log(job.result); // { sent: true }

finished(jobId, timeout?) New

Wait for a job to complete and return its result. Perfect for synchronous workflows.

const job = await queue.add('generate', { prompt });
const result = await queue.finished(job.id, 30000); // 30s timeout
console.log(result); // Worker's return value

getJobCounts()

Get counts of jobs in each state.

const counts = await queue.getJobCounts();
// { waiting: 10, active: 5, completed: 100, failed: 2, delayed: 3 }

pause() / resume()

Pause or resume job processing on the queue.

await queue.pause();   // Workers stop pulling jobs
await queue.resume();  // Workers resume pulling jobs

drain()

Remove all waiting jobs from the queue.

await queue.drain(); // Clear waiting jobs only

obliterate()

Remove all data associated with the queue (jobs, DLQ, settings).

await queue.obliterate(); // Nuclear option - removes everything
⚠️ Warning

obliterate() is irreversible. All jobs and queue data will be permanently deleted.

Job Options

Configure job behavior with these options when calling queue.add().

Option Type Default Description
priority number 0 Higher priority jobs are processed first
delay number 0 Delay in milliseconds before job becomes available
attempts number 1 Number of retry attempts on failure
backoff number | object - Backoff strategy for retries
timeout number - Job processing timeout in milliseconds
jobId string - Custom job ID for idempotency
depends_on number[] - Job IDs that must complete before this job runs
ttl number - Time-to-live: auto-fail if not processed in time

Priority

Jobs with higher priority values are processed first.

await queue.add('low', data, { priority: 1 });
await queue.add('high', data, { priority: 100 }); // Processed first
await queue.add('urgent', data, { priority: 1000 }); // Processed before 'high'

Delay

Schedule a job to run after a specified delay.

// Run after 5 seconds
await queue.add('reminder', data, { delay: 5000 });

// Run after 1 hour
await queue.add('daily-report', data, { delay: 60 * 60 * 1000 });

Backoff

Configure retry delay strategy.

// Fixed delay: retry after 5s each time
await queue.add('job', data, {
  attempts: 3,
  backoff: 5000
});

// Exponential backoff: 1s, 2s, 4s, 8s...
await queue.add('job', data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000
  }
});

Custom Job ID (Idempotency)

Use jobId to prevent duplicate jobs. If a job with the same ID already exists, the existing job is returned.

// Only one job per order
await queue.add('process-order', orderData, {
  jobId: `order-${orderId}`
});

// Second call with same jobId returns existing job
await queue.add('process-order', orderData, {
  jobId: `order-${orderId}`
}); // No duplicate created

Job Dependencies

Create workflows where jobs wait for other jobs to complete before running.

// Step 1: Fetch data
const fetchJob = await queue.add('fetch', { url });

// Step 2: Process (waits for fetch to complete)
const processJob = await queue.add('process', { data }, {
  depends_on: [fetchJob.id]
});

// Step 3: Save (waits for process to complete)
const saveJob = await queue.add('save', { destination }, {
  depends_on: [processJob.id]
});

// Wait for the final result
const result = await queue.finished(saveJob.id);

Multiple Dependencies

A job can depend on multiple jobs. It will only run when ALL dependencies have completed.

// Fan-out: multiple parallel jobs
const job1 = await queue.add('task1', data1);
const job2 = await queue.add('task2', data2);
const job3 = await queue.add('task3', data3);

// Fan-in: aggregate results (waits for all 3)
const aggregateJob = await queue.add('aggregate', {}, {
  depends_on: [job1.id, job2.id, job3.id]
});
💡 AI Pipeline Example

Dependencies are perfect for RAG pipelines: embed → search → generate. See RAG Workflows for a complete example.

Worker API

Workers process jobs from a queue. They automatically handle job acknowledgment, retries, and error handling.

Constructor

const worker = new Worker(queueName, processor, options?);

Options

Option Type Default Description
concurrency number 1 Number of jobs to process in parallel
autorun boolean true Start processing immediately
host string 'localhost' Server hostname
port number 6789 Server port

Processor Function

The processor function receives a job object and should return a result (or throw an error).

const worker = new Worker('emails', async (job) => {
  // Access job properties
  console.log(job.id);       // Unique job ID
  console.log(job.name);     // Job name
  console.log(job.data);     // Job payload
  console.log(job.attempts); // Current attempt number

  // Do work...
  const result = await processJob(job.data);

  // Return result (stored with the job)
  return result;
});

Graceful Shutdown

// Close worker and wait for active jobs to finish
await worker.close();

// Handle process termination
process.on('SIGTERM', async () => {
  await worker.close();
  process.exit(0);
});

Events

Workers emit events for job lifecycle changes.

Event Arguments Description
completed (job, result) Job completed successfully
failed (job, error) Job failed (after all retries)
error (error) Worker-level error
active (job) Job started processing
progress (job, progress) Job progress updated
worker.on('completed', (job, result) => {
  console.log(`✅ Job ${job.id} completed`, result);
});

worker.on('failed', (job, error) => {
  console.error(`❌ Job ${job.id} failed: ${error.message}`);
});

worker.on('active', (job) => {
  console.log(`🔄 Job ${job.id} started`);
});

Rate Limiting

Control job throughput to avoid overwhelming external APIs or services.

import { FlashQ } from 'flashq';

const client = new FlashQ();

// Limit to 100 jobs per second
await client.setRateLimit('openai-calls', 100);

// Limit to 10 jobs per second (for expensive API)
await client.setRateLimit('anthropic-calls', 10);

// Remove rate limit
await client.clearRateLimit('openai-calls');

Concurrency Limiting

Limit how many jobs can be processed simultaneously across all workers.

// Max 5 concurrent jobs processing
await client.setConcurrency('heavy-tasks', 5);

// Remove concurrency limit
await client.clearConcurrency('heavy-tasks');
💡 AI Cost Control

Rate limiting is essential for controlling LLM API costs. Set limits based on your API tier and budget.

Retries & Backoff

Configure automatic retries with exponential backoff for failed jobs.

Basic Retries

// Retry up to 3 times on failure
await queue.add('send-email', emailData, {
  attempts: 3
});

Fixed Delay Backoff

Wait a fixed time between retry attempts.

// Retry after 5 seconds each time
await queue.add('api-call', data, {
  attempts: 5,
  backoff: 5000  // 5 seconds
});

Exponential Backoff

Increase delay exponentially: 1s, 2s, 4s, 8s... Perfect for rate-limited APIs.

// Exponential: base * 2^attempt
await queue.add('openai-request', data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000  // base delay
  }
});
// Delays: 1s, 2s, 4s, 8s, 16s

Retry Flow

Attempt Exponential (1s base) Fixed (5s)
11 second5 seconds
22 seconds5 seconds
34 seconds5 seconds
48 seconds5 seconds
516 seconds5 seconds
⚠️ Dead Letter Queue

After exhausting all attempts, failed jobs are moved to the Dead Letter Queue (DLQ). See Dead Letter Queue for handling failed jobs.

AI Workloads

flashQ is optimized for AI/ML workloads with features designed for LLM pipelines, RAG systems, and batch inference.

🔗 Job Dependencies

Chain jobs for multi-step AI workflows. Perfect for RAG: embed → search → generate.

⏱️ Rate Limiting

Control API costs with per-queue rate limits. Never exceed your OpenAI/Anthropic quota.

📦 Large Payloads

10MB payload support for embeddings, images, and large context windows.

🔄 Smart Retries

Automatic retries with exponential backoff. Handle API rate limits gracefully.

RAG Workflows

Build Retrieval-Augmented Generation pipelines with job dependencies.

rag-pipeline.ts
import { Queue, Worker } from 'flashq';

const rag = new Queue('rag-pipeline');

// Create RAG pipeline
async function askQuestion(question: string) {
  // Step 1: Embed the question
  const embedJob = await rag.add('embed', { text: question });

  // Step 2: Search vector DB (waits for embedding)
  const searchJob = await rag.add('search', { query: question }, {
    depends_on: [embedJob.id]
  });

  // Step 3: Generate answer (waits for search)
  const generateJob = await rag.add('generate', { question }, {
    depends_on: [searchJob.id],
    priority: 10
  });

  // Wait for result
  return rag.finished(generateJob.id);
}

// Workers for each step
new Worker('rag-pipeline', async (job) => {
  switch (job.name) {
    case 'embed':
      return await openai.embeddings.create({
        model: 'text-embedding-3-small',
        input: job.data.text
      });

    case 'search':
      return await vectorDb.search(job.data.query, { limit: 5 });

    case 'generate':
      return await openai.chat.completions.create({
        model: 'gpt-4',
        messages: [{ role: 'user', content: job.data.question }]
      });
  }
}, { concurrency: 10 });

// Usage
const answer = await askQuestion('What is flashQ?');
console.log(answer);

LLM Pipelines

Build multi-step LLM workflows with job dependencies, progress tracking, and automatic retries.

llm-pipeline.ts
import { Queue, Worker } from 'flashq';

const llm = new Queue('llm-pipeline');

// Multi-step summarization pipeline
async function summarizeDocument(doc: string) {
  // Step 1: Chunk the document
  const chunkJob = await llm.add('chunk', { text: doc });

  // Step 2: Summarize each chunk (parallel)
  const summarizeJob = await llm.add('summarize-chunks', {}, {
    depends_on: [chunkJob.id]
  });

  // Step 3: Combine summaries
  const combineJob = await llm.add('combine', {}, {
    depends_on: [summarizeJob.id],
    priority: 10
  });

  return llm.finished(combineJob.id, 60000);
}

// Worker with progress tracking
new Worker('llm-pipeline', async (job) => {
  if (job.name === 'summarize-chunks') {
    const chunks = job.data.chunks;
    const summaries = [];

    for (let i = 0; i < chunks.length; i++) {
      const summary = await openai.chat.completions.create({
        model: 'gpt-4',
        messages: [{ role: 'user', content: `Summarize: ${chunks[i]}` }]
      });
      summaries.push(summary);

      // Update progress
      await job.updateProgress(((i + 1) / chunks.length) * 100);
    }

    return { summaries };
  }
}, {
  concurrency: 5
});

Pipeline Patterns

Sequential Chain

A → B → C. Each step waits for the previous to complete. Perfect for multi-turn conversations.

Fan-Out / Fan-In

Split work into parallel chunks, then aggregate. Ideal for document processing.

Conditional Flow

Branch based on intermediate results. Use for AI classification and routing.

Retry with Fallback

Try GPT-4, fallback to GPT-3.5 on failure. Automatic with retry configuration.

Batch Inference

Process large datasets efficiently with batch operations and progress tracking.

batch-inference.ts
import { Queue, Worker } from 'flashq';

const batch = new Queue('batch-inference');

// Submit batch job for embeddings
async function embedDocuments(documents: string[]) {
  // Split into chunks of 100
  const chunkSize = 100;
  const jobs = [];

  for (let i = 0; i < documents.length; i += chunkSize) {
    const chunk = documents.slice(i, i + chunkSize);
    const job = await batch.add('embed', {
      documents: chunk,
      batchIndex: i / chunkSize
    }, {
      priority: 5,
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 }
    });
    jobs.push(job);
  }

  // Wait for all jobs
  return Promise.all(jobs.map(j => batch.finished(j.id)));
}

// Worker optimized for batch processing
new Worker('batch-inference', async (job) => {
  const { documents } = job.data;

  // Use OpenAI batch embedding API
  const response = await openai.embeddings.create({
    model: 'text-embedding-3-small',
    input: documents
  });

  return {
    embeddings: response.data.map(d => d.embedding),
    model: response.model,
    usage: response.usage
  };
}, {
  concurrency: 10  // Process 10 batches in parallel
});

// Usage: Process 10,000 documents
const results = await embedDocuments(myDocuments);
console.log(`Processed ${results.length} batches`);
💡 Large Payload Support

flashQ supports payloads up to 10MB, making it perfect for passing embeddings (1536-3072 dimensions) between pipeline stages.

Cron Jobs

Schedule recurring jobs with standard cron expressions. flashQ supports 6-field cron (including seconds).

cron-jobs.ts
import { FlashQ } from 'flashq';

const client = new FlashQ();

// Every minute
await client.addCron('health-check', {
  queue: 'monitoring',
  schedule: '0 * * * * *',  // sec min hour day month weekday
  data: { type: 'health' }
});

// Every hour at minute 0
await client.addCron('hourly-sync', {
  queue: 'sync',
  schedule: '0 0 * * * *',
  data: { source: 'external-api' }
});

// Every day at 2 AM
await client.addCron('daily-report', {
  queue: 'reports',
  schedule: '0 0 2 * * *',
  data: { reportType: 'daily' }
});

// Every Monday at 9 AM
await client.addCron('weekly-digest', {
  queue: 'emails',
  schedule: '0 0 9 * * 1',
  data: { template: 'weekly-digest' }
});

// List all cron jobs
const crons = await client.listCrons();
console.log(crons);

// Delete a cron job
await client.deleteCron('health-check');

Cron Expression Format

Field Values Description
Second0-59Optional, defaults to 0
Minute0-59Required
Hour0-23Required
Day1-31Required
Month1-12Required
Weekday0-60 = Sunday

Common Patterns

Expression Description
0 * * * * *Every minute
0 0 * * * *Every hour
0 0 0 * * *Every day at midnight
0 30 9 * * 1-5Weekdays at 9:30 AM
0 0 */2 * * *Every 2 hours

Dead Letter Queue

Jobs that fail all retry attempts are moved to the Dead Letter Queue (DLQ) for inspection and manual retry.

dlq-handling.ts
import { FlashQ } from 'flashq';

const client = new FlashQ();

// Get failed jobs from DLQ
const failedJobs = await client.getDlq('emails', 10);

for (const job of failedJobs) {
  console.log(`Job ${job.id} failed: ${job.error}`);
  console.log(`Data: ${JSON.stringify(job.data)}`);
  console.log(`Attempts: ${job.attempts}`);
}

// Retry a specific failed job
await client.retryDlq('emails', 123);

// Retry ALL failed jobs in queue
await client.retryDlq('emails');

// Discard a job directly to DLQ
await client.discard(456);

DLQ Best Practices

Monitor Regularly

Set up alerts for DLQ growth. A growing DLQ indicates systemic issues.

Log Failures

Include detailed error messages in job failures for easier debugging.

Automate Retries

For transient errors, set up automatic retry schedules during off-peak hours.

Clean Periodically

Use clean() to remove old DLQ entries after investigation.

Progress Tracking

Track job progress in real-time for long-running tasks like file processing or AI inference.

progress-tracking.ts
import { Queue, Worker, FlashQ } from 'flashq';

const queue = new Queue('processing');
const client = new FlashQ();

// Worker that reports progress
new Worker('processing', async (job) => {
  const items = job.data.items;
  const results = [];

  for (let i = 0; i < items.length; i++) {
    // Process item
    const result = await processItem(items[i]);
    results.push(result);

    // Update progress (0-100)
    const progress = Math.round(((i + 1) / items.length) * 100);
    await job.updateProgress(progress, `Processed ${i + 1}/${items.length}`);
  }

  return results;
});

// Monitor progress from producer
const job = await queue.add('batch', { items: largeArray });

// Poll progress
const interval = setInterval(async () => {
  const { progress, message } = await client.getProgress(job.id);
  console.log(`Progress: ${progress}% - ${message}`);

  if (progress === 100) {
    clearInterval(interval);
  }
}, 1000);

// Or listen to progress events
worker.on('progress', (job, progress) => {
  console.log(`Job ${job.id}: ${progress}%`);
});
💡 Heartbeat for Long Jobs

For jobs running longer than the timeout, use job.heartbeat() to prevent stall detection from failing the job.

Self-Hosting

flashQ is designed for easy self-hosting. Run it on any Linux, macOS, or container environment.

System Requirements

Component Minimum Recommended
CPU1 core2+ cores
Memory512 MB1-2 GB
Disk100 MB1 GB (for persistence)
OSLinux (x86_64, arm64), macOS

Binary Installation

# Linux x86_64
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-linux-x86_64.tar.gz | tar xz
sudo mv flashq-server /usr/local/bin/

# Linux ARM64 (Raspberry Pi, AWS Graviton)
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-linux-arm64.tar.gz | tar xz
sudo mv flashq-server /usr/local/bin/

# macOS Apple Silicon
curl -L https://github.com/egeominotti/flashq/releases/latest/download/flashq-macos-arm64.tar.gz | tar xz
sudo mv flashq-server /usr/local/bin/

Systemd Service

/etc/systemd/system/flashq.service
[Unit]
Description=flashQ Job Queue Server
After=network.target postgresql.service

[Service]
Type=simple
User=flashq
Environment=HTTP=1
Environment=DATABASE_URL=postgres://flashq:password@localhost/flashq
ExecStart=/usr/local/bin/flashq-server
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
# Enable and start
sudo systemctl enable flashq
sudo systemctl start flashq

# Check status
sudo systemctl status flashq

# View logs
sudo journalctl -u flashq -f

Docker

The recommended way to run flashQ in production.

Quick Start

# Run with dashboard enabled
docker run -d --name flashq \
  -p 6789:6789 \
  -p 6790:6790 \
  -e HTTP=1 \
  ghcr.io/egeominotti/flashq:latest

Docker Compose (Production)

docker-compose.yml
version: '3.8'

services:
  flashq:
    image: ghcr.io/egeominotti/flashq:latest
    ports:
      - "6789:6789"   # TCP
      - "6790:6790"   # HTTP/Dashboard
    environment:
      - HTTP=1
      - DATABASE_URL=postgres://flashq:secret@postgres:5432/flashq
      - AUTH_TOKENS=your-secret-token
    depends_on:
      postgres:
        condition: service_healthy
    restart: unless-stopped
    deploy:
      resources:
        limits:
          memory: 1G

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: flashq
      POSTGRES_PASSWORD: secret
      POSTGRES_DB: flashq
    volumes:
      - flashq_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U flashq"]
      interval: 5s
      timeout: 5s
      retries: 5

volumes:
  flashq_data:

Environment Variables

Variable Default Description
PORT6789TCP port for client connections
HTTP0Enable HTTP API (1 = enabled)
HTTP_PORT6790HTTP API and dashboard port
GRPC0Enable gRPC API (1 = enabled)
GRPC_PORT6791gRPC API port
DATABASE_URL-PostgreSQL connection string
AUTH_TOKENS-Comma-separated auth tokens
CLUSTER_MODE0Enable clustering (1 = enabled)
NODE_IDautoUnique node ID for clustering

Clustering (High Availability)

Run multiple flashQ nodes for high availability. PostgreSQL is used for coordination and leader election.

Architecture

┌──────────┐    ┌──────────┐    ┌──────────┐
│  Node 1  │    │  Node 2  │    │  Node 3  │
│ (Leader) │    │(Follower)│    │(Follower)│
└────┬─────┘    └────┬─────┘    └────┬─────┘
     │               │               │
     └───────────────┼───────────────┘
                     │
              ┌──────▼──────┐
              │  PostgreSQL │
              │  (Shared)   │
              └─────────────┘

How It Works

  • Leader election: Uses PostgreSQL advisory locks (pg_try_advisory_lock)
  • Leader responsibilities: Runs background tasks (cron, cleanup, timeouts)
  • All nodes: Handle client requests (push/pull/ack)
  • Automatic failover: Within 5 seconds when leader crashes
  • Health checks: Stale nodes cleaned after 30s of no heartbeat

Multi-Node Setup

docker-compose.ha.yml
version: '3.8'

services:
  flashq-node1:
    image: ghcr.io/egeominotti/flashq:latest
    ports:
      - "6789:6789"
      - "6790:6790"
    environment:
      - CLUSTER_MODE=1
      - NODE_ID=node-1
      - HTTP=1
      - DATABASE_URL=postgres://flashq:secret@postgres:5432/flashq
    depends_on:
      - postgres

  flashq-node2:
    image: ghcr.io/egeominotti/flashq:latest
    ports:
      - "6793:6789"
      - "6794:6790"
    environment:
      - CLUSTER_MODE=1
      - NODE_ID=node-2
      - HTTP=1
      - DATABASE_URL=postgres://flashq:secret@postgres:5432/flashq
    depends_on:
      - postgres

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: flashq
      POSTGRES_PASSWORD: secret
      POSTGRES_DB: flashq
    volumes:
      - ha_data:/var/lib/postgresql/data

volumes:
  ha_data:

Cluster Endpoints

Endpoint Description
GET /healthNode health with leader/follower status
GET /cluster/nodesList all nodes in cluster
# Check cluster status
curl http://localhost:6790/cluster/nodes

# Response
{
  "nodes": [
    { "id": "node-1", "host": "flashq-node1", "is_leader": true },
    { "id": "node-2", "host": "flashq-node2", "is_leader": false }
  ]
}

Configuration

Configure the flashQ server with environment variables.

Variable Default Description
PORT 6789 TCP port for client connections
HTTP 0 Enable HTTP API and dashboard (set to 1)
HTTP_PORT 6790 HTTP API port
DATABASE_URL - PostgreSQL connection URL for persistence
AUTH_TOKENS - Comma-separated list of valid auth tokens
CLUSTER_MODE 0 Enable clustering (set to 1)

Example: Production Docker

docker run -d --name flashq \
  -p 6789:6789 \
  -p 6790:6790 \
  -e HTTP=1 \
  -e DATABASE_URL=postgres://user:pass@db:5432/flashq \
  -e AUTH_TOKENS=secret1,secret2 \
  ghcr.io/egeominotti/flashq:latest

Migration from BullMQ

flashQ uses a BullMQ-compatible API, making migration straightforward.

Install flashQ SDK

bun add flashq

Start the flashQ server

docker run -d -p 6789:6789 ghcr.io/egeominotti/flashq:latest

Update your imports

// Before (BullMQ)
import { Queue, Worker } from 'bullmq';

// After (flashQ)
import { Queue, Worker } from 'flashq';

Remove Redis configuration

// Before (BullMQ)
const queue = new Queue('emails', {
  connection: { host: 'localhost', port: 6379 }
});

// After (flashQ) - no Redis needed!
const queue = new Queue('emails');
✅ That's it!

Your code should work without any other changes. The Queue and Worker APIs are compatible.

API Reference

Complete reference for all SDK methods and server commands.

Queue Class

Method Description
add(name, data, opts?)Add a single job to the queue
addBulk(jobs)Add multiple jobs in batch
getJob(jobId)Get job by ID with current state
getJobs(state?, limit?, offset?)List jobs with filtering and pagination
getJobCounts()Get counts by state
count()Count waiting + delayed jobs
finished(jobId, timeout?)Wait for job completion
pause()Pause the queue
resume()Resume the queue
isPaused()Check if queue is paused
drain()Remove all waiting jobs
obliterate()Remove ALL queue data
clean(grace, state, limit?)Cleanup by age and state
close()Close connection

Worker Class

Method / Event Description
run()Start processing (if autorun=false)
pause()Pause the worker
resume()Resume the worker
close()Stop and close connections
on('completed')Job completed event
on('failed')Job failed event
on('active')Job started event
on('progress')Progress update event
on('error')Worker error event

FlashQ Client (Low-Level)

Method Description
connect()Connect to server
close()Close connection
auth(token)Authenticate with token
push(queue, data, opts?)Push a job
pushBatch(queue, jobs)Push multiple jobs
pull(queue)Pull a job (blocking)
pullBatch(queue, count)Pull multiple jobs
ack(jobId, result?)Acknowledge completion
ackBatch(jobIds)Batch acknowledge
fail(jobId, error?)Fail a job
cancel(jobId)Cancel pending job
getJob(jobId)Get job details
getState(jobId)Get job state only
getResult(jobId)Get job result
getJobByCustomId(customId)Lookup by custom ID
progress(jobId, pct, msg?)Update progress
getProgress(jobId)Get job progress
update(jobId, data)Update job data
changePriority(jobId, pri)Change priority
moveToDelayed(jobId, delay)Move to delayed
promote(jobId)Move delayed to waiting
discard(jobId)Move to DLQ
heartbeat(jobId)Send heartbeat
log(jobId, msg, level?)Add log entry
getLogs(jobId)Get job logs
getDlq(queue, count?)Get DLQ jobs
retryDlq(queue, jobId?)Retry DLQ jobs
setRateLimit(queue, limit)Set rate limit
clearRateLimit(queue)Clear rate limit
setConcurrency(queue, limit)Set concurrency
clearConcurrency(queue)Clear concurrency
addCron(name, options)Add cron job
deleteCron(name)Delete cron job
listCrons()List cron jobs
stats()Get queue statistics
metrics()Get detailed metrics
listQueues()List all queues

Job Options (Complete)

Option Type Description
prioritynumberHigher = processed first
delaynumberDelay in milliseconds
attemptsnumberMax retry attempts
backoffnumber | objectRetry backoff strategy
timeoutnumberProcessing timeout (ms)
ttlnumberTime-to-live (ms)
jobIdstringCustom ID for idempotency
depends_onnumber[]Job dependencies
unique_keystringDeduplication key
tagsstring[]Job tags for filtering
lifobooleanLast-in-first-out mode
stall_timeoutnumberStall detection (ms)
debounce_idstringDebounce identifier
debounce_ttlnumberDebounce window (ms)
keepCompletedAgenumberKeep result for duration (ms)
keepCompletedCountnumberKeep in last N completed

HTTP API Endpoints

Method Endpoint Description
GET/healthHealth check
GET/metrics/prometheusPrometheus metrics
GET/cluster/nodesList cluster nodes
POST/api/pushPush job via HTTP
POST/api/pullPull job via HTTP
POST/api/ackAcknowledge job
GET/api/job/:idGet job details
GET/api/statsQueue statistics

Troubleshooting

Connection refused

Error: ECONNREFUSED 127.0.0.1:6789

Solution: Make sure the flashQ server is running:

docker ps | grep flashq

Job timeout

Error: Job exceeds processing timeout

Solution: Increase the timeout in job options:

await queue.add('long-job', data, {
  timeout: 300000 // 5 minutes
});

Authentication failed

Error: AUTH_FAILED

Solution: Set the token in your client configuration:

const queue = new Queue('emails', {
  token: 'your-auth-token'
});

Job stuck in active state

Error: Job remains active after worker crash

Solution: Jobs are automatically recovered after the stall timeout. For long jobs, use heartbeat:

new Worker('queue', async (job) => {
  for (const item of items) {
    await processItem(item);
    await job.heartbeat();  // Prevent stall detection
  }
});

Memory usage growing

Cause: Completed jobs and results accumulating

Solution: Configure retention or use clean():

// During job creation
await queue.add('job', data, {
  keepCompletedAge: 3600000,  // Keep for 1 hour
  keepCompletedCount: 1000   // Keep last 1000
});

// Manual cleanup
await queue.clean(3600000, 'completed');