flashQ Documentation
Complete documentation for the flashQ TypeScript SDK. High-performance job queue built with Rust, BullMQ-compatible API, 10x faster, no Redis required.
Overview
flashQ is a high-performance job queue built with Rust, designed as a drop-in replacement for BullMQ without requiring Redis.
Easy migration from existing BullMQ projects
With optional HTTP fallback
40% smaller payloads
OpenTelemetry/DataDog integration
Key Features
| Feature | Description |
|---|---|
10MB Payloads | Support for large AI/ML workloads |
Job Dependencies | Chain jobs with depends_on |
Priority Queues | Higher priority = processed first |
Rate Limiting | Token bucket algorithm per queue |
Concurrency Control | Limit parallel processing |
Dead Letter Queue | Automatic DLQ after max retries |
Cron Jobs | 6-field cron expressions |
Progress Tracking | Real-time progress updates |
Installation
SDK
npm install flashq
yarn add flashq
bun add flashq
Requirements
- Node.js >= 18.0.0 or Bun
- flashQ server running (Docker recommended)
Server (Docker)
docker run -d --name flashq \
-p 6789:6789 \
-p 6790:6790 \
-e HTTP=1 \
ghcr.io/egeominotti/flashq:latest
Architecture
Protocol Options
| Protocol | Use Case | Performance |
|---|---|---|
TCP + JSON | Default, debugging | Good |
TCP + MessagePack | Production, high throughput | Best (40% smaller) |
HTTP + JSON | Firewalls, load balancers | Good |
Connection Options
The FlashQ class provides direct access to all server operations.
import { FlashQ } from 'flashq';
const client = new FlashQ({
host: 'localhost',
port: 6789,
});
await client.connect();
// ... use client
await client.close();
ClientOptions
| Option | Type | Default | Description |
|---|---|---|---|
host | string | 'localhost' | Server host |
port | number | 6789 | TCP port |
httpPort | number | 6790 | HTTP port |
socketPath | string | - | Unix socket path |
token | string | - | Auth token |
timeout | number | 5000 | Connection timeout (ms) |
useHttp | boolean | false | Use HTTP instead of TCP |
useBinary | boolean | false | Use MessagePack protocol |
autoReconnect | boolean | true | Auto-reconnect on disconnect |
maxReconnectAttempts | number | 10 | Max reconnect attempts (0 = infinite) |
reconnectDelay | number | 1000 | Initial reconnect delay (ms) |
maxReconnectDelay | number | 30000 | Max reconnect delay (ms) |
logLevel | string | 'silent' | Log level |
compression | boolean | false | Enable gzip compression |
compressionThreshold | number | 1024 | Min payload size to compress |
hooks | ClientHooks | - | Observability hooks |
Core Methods
connect(): Promise<void>
Establish connection to the flashQ server.
const client = new FlashQ({ host: 'localhost', port: 6789 });
await client.connect();
close(): Promise<void>
Close the connection gracefully.
await client.close();
push<T>(queue, data, options?): Promise<Job>
Push a job to a queue.
const job = await client.push('emails', {
to: 'user@example.com',
subject: 'Welcome!'
}, {
priority: 10,
delay: 5000,
max_attempts: 3,
});
pushBatch<T>(queue, jobs): Promise<number[]>
Push multiple jobs in a single batch.
const ids = await client.pushBatch('emails', [
{ data: { to: 'user1@example.com' } },
{ data: { to: 'user2@example.com' }, priority: 10 },
]);
pull<T>(queue, timeout?): Promise<Job | null>
Pull a job from a queue (blocking with timeout).
const job = await client.pull<EmailData>('emails', 5000);
if (job) {
console.log('Processing:', job.data);
await client.ack(job.id);
}
ack(jobId, result?): Promise<void>
Acknowledge a job as completed.
await client.ack(job.id, { sent: true, timestamp: Date.now() });
fail(jobId, error?): Promise<void>
Fail a job (will retry or move to DLQ).
await client.fail(job.id, 'SMTP connection timeout');
PushOptions
| Option | Type | Description |
|---|---|---|
priority | number | Higher = processed first (default: 0) |
delay | number | Delay in ms before job is available |
max_attempts | number | Max retry attempts (default: 0) |
backoff | number | Backoff base in ms (exponential) |
timeout | number | Job timeout in ms |
ttl | number | Time-to-live in ms |
unique_key | string | Unique key for deduplication |
jobId | string | Custom ID for idempotency |
depends_on | number[] | Job IDs that must complete first |
tags | string[] | Tags for filtering |
lifo | boolean | LIFO mode (stack) |
group_id | string | Group ID for FIFO within group |
Job Management
getJob(jobId): Promise<JobWithState | null>
Get a job with its current state.
const result = await client.getJob(123);
if (result) {
console.log(`Job ${result.job.id} is ${result.state}`);
}
getState(jobId): Promise<JobState | null>
Get job state only.
const state = await client.getState(123);
// Returns: 'waiting' | 'delayed' | 'active' | 'completed' | 'failed' | null
getJobs(options): Promise<{jobs, total}>
List jobs with filtering and pagination.
const { jobs, total } = await client.getJobs({
queue: 'emails',
state: 'failed',
limit: 100,
offset: 0,
});
progress(jobId, progress, message?): Promise<void>
Update job progress (0-100).
await client.progress(job.id, 50, 'Processing items...');
await client.progress(job.id, 100, 'Complete');
finished(jobId, timeout?): Promise<unknown>
Wait for job completion and return result.
const job = await client.push('processing', data);
const result = await client.finished(job.id, 30000); // 30s timeout
cancel(jobId): Promise<void>
Cancel a pending job.
await client.cancel(123);
changePriority(jobId, priority): Promise<void>
Change job priority at runtime.
await client.changePriority(job.id, 100); // Boost priority
log(jobId, message, level?): Promise<void>
Add log entry to job.
await client.log(job.id, 'Starting processing', 'info');
await client.log(job.id, 'Warning: rate limit approaching', 'warn');
Queue Management
pause(queue): Promise<void>
Pause a queue (stops processing).
await client.pause('emails');
resume(queue): Promise<void>
Resume a paused queue.
await client.resume('emails');
drain(queue): Promise<number>
Remove all waiting jobs from queue.
const removed = await client.drain('emails');
console.log(`Removed ${removed} jobs`);
obliterate(queue): Promise<void>
Remove ALL queue data (jobs, DLQ, cron, state).
await client.obliterate('test-queue'); // Dangerous!
listQueues(): Promise<QueueInfo[]>
List all queues with stats.
const queues = await client.listQueues();
for (const q of queues) {
console.log(`${q.name}: ${q.pending} pending, ${q.processing} active`);
}
getJobCounts(queue): Promise<Record<JobState, number>>
Get job counts grouped by state.
const counts = await client.getJobCounts('emails');
// { waiting: 10, delayed: 5, active: 2, completed: 100, failed: 3 }
Dead Letter Queue
getDlq(queue, count?): Promise<Job[]>
Get jobs from dead letter queue.
const dlqJobs = await client.getDlq('emails', 100);
retryDlq(queue, jobId?): Promise<number>
Retry DLQ jobs. If jobId is provided, retries only that job.
// Retry all DLQ jobs
const retried = await client.retryDlq('emails');
// Retry specific job
await client.retryDlq('emails', 123);
purgeDlq(queue): Promise<number>
Remove all jobs from DLQ.
const purged = await client.purgeDlq('emails');
Rate Limiting
setRateLimit(queue, limit): Promise<void>
Set queue rate limit (jobs per second).
await client.setRateLimit('api-calls', 100); // 100 jobs/sec
clearRateLimit(queue): Promise<void>
Clear rate limit.
await client.clearRateLimit('api-calls');
setConcurrency(queue, limit): Promise<void>
Set concurrency limit for queue.
await client.setConcurrency('heavy-processing', 5);
Cron Jobs
addCron(name, options): Promise<void>
Add a cron job with schedule or repeat interval.
// Using cron expression (sec min hour day month weekday)
await client.addCron('daily-report', {
queue: 'reports',
data: { type: 'daily' },
schedule: '0 0 9 * * *', // Every day at 9:00 AM
priority: 10,
});
// Using repeat interval
await client.addCron('health-check', {
queue: 'monitoring',
data: { check: 'health' },
repeat_every: 60000, // Every minute
limit: 1000, // Max 1000 executions
});
deleteCron(name): Promise<boolean>
Delete a cron job.
const deleted = await client.deleteCron('daily-report');
listCrons(): Promise<CronJob[]>
List all cron jobs.
const crons = await client.listCrons();
for (const cron of crons) {
console.log(`${cron.name}: next run at ${new Date(cron.next_run)}`);
}
Flows (Job Dependencies)
pushFlow<T>(queue, parentData, children, options?): Promise<FlowResult>
Create a workflow with parent and children jobs.
const flow = await client.pushFlow(
'processing',
{ type: 'aggregate' }, // Parent job data
[
{ queue: 'step1', data: { task: 'fetch' } },
{ queue: 'step2', data: { task: 'transform' } },
{ queue: 'step3', data: { task: 'load' } },
],
{ priority: 10 } // FlowOptions
);
// Wait for parent (completes when all children complete)
const result = await client.finished(flow.parent_id);
Metrics
stats(): Promise<QueueStats>
Get queue statistics.
const stats = await client.stats();
console.log(`Queued: ${stats.queued}, Processing: ${stats.processing}`);
metrics(): Promise<Metrics>
Get detailed metrics.
const metrics = await client.metrics();
console.log(`Throughput: ${metrics.jobs_per_second} jobs/sec`);
console.log(`Avg Latency: ${metrics.avg_latency_ms}ms`);
Queue Class
The Queue class provides a BullMQ-compatible API for job management.
import { Queue } from 'flashq';
const queue = new Queue('emails', {
host: 'localhost',
port: 6789,
});
add(name, data, opts?): Promise<Job>
Add a job to the queue.
const job = await queue.add('send-email', {
to: 'user@example.com',
subject: 'Hello',
}, {
priority: 10,
delay: 5000,
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
addBulk(jobs): Promise<Job[]>
Add multiple jobs.
const jobs = await queue.addBulk([
{ name: 'send', data: { to: 'a@test.com' } },
{ name: 'send', data: { to: 'b@test.com' }, opts: { priority: 10 } },
]);
finished(jobId, timeout?): Promise<R | null>
Wait for job completion and return its result.
const job = await queue.add('process', data);
const result = await queue.finished(job.id, 30000);
getJobCounts(): Promise<JobCounts>
Get job counts by state.
const counts = await queue.getJobCounts();
// { waiting: 10, active: 2, completed: 100, failed: 5, delayed: 3 }
pause() / resume() / isPaused()
Control queue processing state.
await queue.pause();
if (await queue.isPaused()) {
console.log('Queue is paused');
}
await queue.resume();
drain() / obliterate() / clean()
Clean up queue data.
await queue.drain(); // Remove waiting jobs
await queue.obliterate(); // Remove ALL data
const cleaned = await queue.clean(3600000, 1000, 'completed'); // Remove old completed
Worker Class
The Worker class processes jobs from queues.
import { Worker } from 'flashq';
const worker = new Worker('emails', async (job) => {
console.log('Processing:', job.data);
return { sent: true };
}, {
concurrency: 10,
autorun: true,
});
WorkerOptions
| Option | Type | Default | Description |
|---|---|---|---|
id | string | auto | Worker ID |
concurrency | number | 10 | Parallel job processing |
batchSize | number | 100 | Jobs to pull per batch |
autoAck | boolean | true | Auto-acknowledge on success |
autorun | boolean | true | Start processing immediately |
closeTimeout | number | 30000 | Graceful shutdown timeout (ms) |
Worker Methods
start() / stop(force?) / close(force?)
Control worker lifecycle.
const worker = new Worker('queue', processor, { autorun: false });
await worker.start();
// ... later
await worker.close(); // Graceful shutdown
await worker.close(true); // Force stop
updateProgress(jobId, progress, message?)
Update job progress during processing.
const worker = new Worker('queue', async (job) => {
await worker.updateProgress(job.id, 0, 'Starting...');
await worker.updateProgress(job.id, 50, 'Halfway done');
await worker.updateProgress(job.id, 100, 'Complete');
return result;
});
getState() / isRunning() / getProcessingCount()
Check worker status.
const state = worker.getState(); // 'idle' | 'starting' | 'running' | 'stopping' | 'stopped'
if (worker.isRunning()) {
console.log(`Processing ${worker.getProcessingCount()} jobs`);
}
Worker Events
worker.on('ready', () => console.log('Worker ready'));
worker.on('active', (job, workerId) => {
console.log(`Job ${job.id} started by ${workerId}`);
});
worker.on('completed', (job, result, workerId) => {
console.log(`Job ${job.id} completed:`, result);
});
worker.on('failed', (job, error, workerId) => {
console.log(`Job ${job.id} failed:`, error.message);
});
worker.on('progress', (job, progress, message) => {
console.log(`Job ${job.id}: ${progress}% - ${message}`);
});
worker.on('stopping', () => console.log('Stopping...'));
worker.on('stopped', () => console.log('Stopped'));
worker.on('drained', () => console.log('All jobs completed'));
worker.on('error', (error) => console.error('Error:', error));
Real-Time Events
Subscribe to job events via SSE or WebSocket.
import { FlashQ, EventSubscriber } from 'flashq';
const client = new FlashQ({ host: 'localhost' });
await client.connect();
// Subscribe via SSE
const subscriber = client.subscribe('emails');
// Or via WebSocket
const wsSubscriber = client.subscribeWs('emails');
await subscriber.connect();
Event Types
| Event | Description |
|---|---|
pushed | Job was added to queue |
completed | Job completed successfully |
failed | Job failed (moved to DLQ or retry) |
progress | Job progress was updated |
timeout | Job timed out |
subscriber.on('connected', () => console.log('Connected'));
subscriber.on('disconnected', () => console.log('Disconnected'));
subscriber.on('reconnecting', (attempt) => console.log(`Reconnecting ${attempt}...`));
subscriber.on('event', (event) => {
console.log(`${event.eventType}: Job ${event.jobId}`);
});
subscriber.on('completed', (event) => {
console.log(`Job ${event.jobId} completed`);
});
subscriber.on('failed', (event) => {
console.log(`Job ${event.jobId} failed: ${event.error}`);
});
// Cleanup
subscriber.close();
Error Handling
flashQ provides typed error classes for precise error handling.
Error Classes
| Class | Code | Retryable | Description |
|---|---|---|---|
ConnectionError | CONNECTION_* | Yes | Connection failures |
TimeoutError | REQUEST_TIMEOUT | Yes | Request timeouts |
AuthenticationError | AUTH_FAILED | No | Auth failures |
ValidationError | VALIDATION_ERROR | No | Invalid input |
JobNotFoundError | JOB_NOT_FOUND | No | Job doesn't exist |
RateLimitError | RATE_LIMITED | Yes | Rate limit exceeded |
QueuePausedError | QUEUE_PAUSED | Yes | Queue is paused |
import {
FlashQError, ConnectionError, TimeoutError,
ValidationError, RateLimitError, JobNotFoundError,
} from 'flashq';
try {
await client.push('queue', data);
} catch (error) {
if (error instanceof ConnectionError) {
console.log('Connection failed:', error.message);
// Safe to retry
} else if (error instanceof TimeoutError) {
console.log(`Timeout after ${error.timeoutMs}ms`);
// Safe to retry
} else if (error instanceof RateLimitError) {
console.log(`Rate limited, retry after ${error.retryAfterMs}ms`);
// Wait and retry
} else if (error instanceof ValidationError) {
console.log(`Invalid ${error.field}: ${error.message}`);
// Fix input, don't retry
} else if (error instanceof FlashQError) {
if (error.retryable) {
// Safe to retry
}
}
}
Retry Logic
Built-in retry utilities with exponential backoff.
withRetry(fn, options)
Wrap a single operation with retry logic.
import { withRetry } from 'flashq';
const result = await withRetry(
() => client.push('queue', data),
{
maxRetries: 3,
initialDelay: 100,
maxDelay: 5000,
backoffMultiplier: 2,
jitter: true,
onRetry: (error, attempt, delay) => {
console.log(`Retry ${attempt} after ${delay}ms: ${error.message}`);
},
}
);
retryable(fn, options)
Create a retryable version of a function.
import { retryable } from 'flashq';
const retryablePush = retryable(
(queue: string, data: unknown) => client.push(queue, data),
{ maxRetries: 3 }
);
await retryablePush('emails', { to: 'user@example.com' });
RetryPresets
Pre-configured retry strategies.
import { RetryPresets } from 'flashq';
RetryPresets.fast // 2 retries, 50ms initial, 500ms max
RetryPresets.standard // 3 retries, 100ms initial, 5s max
RetryPresets.aggressive // 5 retries, 200ms initial, 30s max
RetryPresets.none // 0 retries
Observability Hooks
Integrate with OpenTelemetry, DataDog, or custom metrics.
import { FlashQ, ClientHooks } from 'flashq';
const hooks: ClientHooks = {
onPush: (ctx) => console.log(`Pushing to ${ctx.queue}`),
onPushComplete: (ctx) => {
console.log(`Pushed job ${ctx.job?.id} in ${Date.now() - ctx.startTime}ms`);
},
onPushError: (ctx, error) => console.error(`Push failed: ${error.message}`),
onPull: (ctx) => console.log(`Pulling from ${ctx.queue}`),
onPullComplete: (ctx) => console.log(`Pulled job ${ctx.job?.id}`),
onConnection: (ctx) => console.log(`Connection event: ${ctx.event}`),
};
const client = new FlashQ({ hooks });
OpenTelemetry Example
import { trace, SpanStatusCode } from '@opentelemetry/api';
const tracer = trace.getTracer('flashq');
const hooks: ClientHooks = {
onPush: (ctx) => {
ctx.span = tracer.startSpan('flashq.push', {
attributes: { 'flashq.queue': ctx.queue },
});
},
onPushComplete: (ctx) => {
ctx.span?.setAttribute('flashq.job_id', ctx.job?.id);
ctx.span?.end();
},
onPushError: (ctx, error) => {
ctx.span?.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
ctx.span?.recordException(error);
ctx.span?.end();
},
};
Logger
Configurable logging with request ID tracking.
import { createLogger, setGlobalLogger } from 'flashq';
const logger = createLogger({
level: 'info', // trace | debug | info | warn | error | silent
prefix: 'my-app',
timestamps: true,
});
logger.info('Processing request', { userId: 123 });
// Output: 2024-01-15T10:30:00.000Z [my-app] INFO Processing request { userId: 123 }
// Request ID tracking
logger.setRequestId('req-12345');
logger.info('Processing'); // Includes [req-12345] in output
// Child loggers
const dbLogger = logger.child('db');
dbLogger.info('Query executed');
// Output: [my-app:db] INFO Query executed
// Set as global
setGlobalLogger(logger);
Types Reference
Job<T>
interface Job<T = unknown> {
id: number;
queue: string;
data: T;
priority: number;
attempts: number;
created_at: number;
run_at: number;
max_attempts: number;
backoff: number;
ttl: number;
timeout: number;
progress: number;
unique_key?: string;
custom_id?: string;
tags: string[];
depends_on: number[];
parent_id?: number;
children_ids: number[];
group_id?: string;
}
JobState
type JobState = 'waiting' | 'delayed' | 'active' | 'completed' | 'failed';
QueueInfo / QueueStats / Metrics
interface QueueInfo {
name: string;
pending: number;
processing: number;
dlq: number;
paused: boolean;
}
interface QueueStats {
queued: number;
processing: number;
delayed: number;
dlq: number;
}
interface Metrics {
total_pushed: number;
total_completed: number;
total_failed: number;
jobs_per_second: number;
avg_latency_ms: number;
}
CronJob / CronOptions
interface CronJob {
name: string;
queue: string;
data: unknown;
schedule?: string;
repeat_every?: number;
priority: number;
next_run: number;
executions: number;
limit?: number;
}
interface CronOptions {
queue: string;
data: unknown;
schedule?: string; // "sec min hour day month weekday"
repeat_every?: number; // Or repeat every N ms
priority?: number;
limit?: number;
}
FlowChild / FlowResult
interface FlowChild {
queue: string;
data: unknown;
priority?: number;
delay?: number;
}
interface FlowResult {
parent_id: number;
children_ids: number[];
}
Constants
import { MAX_BATCH_SIZE, MAX_JOB_DATA_SIZE } from 'flashq';
MAX_BATCH_SIZE // 1000 - Maximum jobs per batch
MAX_JOB_DATA_SIZE // 10MB - Maximum job data size
Best Practices
1. Use Binary Protocol in Production
const client = new FlashQ({
useBinary: true, // 40% smaller payloads
});
2. Enable Graceful Shutdown
process.on('SIGTERM', async () => {
await worker.close(); // Waits for current jobs
await client.close();
process.exit(0);
});
3. Use Batch Operations
// Instead of many single pushes
for (const data of items) {
await client.push('queue', data); // Slow
}
// Use batch
await client.pushBatch('queue', items.map(data => ({ data }))); // Fast
4. Handle Errors Properly
try {
await client.push('queue', data);
} catch (error) {
if (error instanceof FlashQError && error.retryable) {
await withRetry(() => client.push('queue', data));
} else {
logger.error('Failed to push job', error);
}
}
5. Use Job Dependencies for Workflows
const step1 = await client.push('step1', data1);
const step2 = await client.push('step2', data2, { depends_on: [step1.id] });
const step3 = await client.push('step3', data3, { depends_on: [step2.id] });
const result = await client.finished(step3.id);
6. Set Appropriate Concurrency
// For I/O-bound work (API calls, DB)
const worker = new Worker('api-queue', processor, { concurrency: 50 });
// For CPU-bound work
const worker = new Worker('cpu-queue', processor, { concurrency: os.cpus().length });
7. Use Rate Limiting for External APIs
await client.setRateLimit('openai-calls', 50); // 50 req/sec
const worker = new Worker('openai-calls', async (job) => {
// Rate limiting is enforced server-side
return await openai.chat.completions.create(job.data);
});
8. Monitor with Hooks
const client = new FlashQ({
hooks: {
onPushComplete: (ctx) => {
metrics.pushLatency.observe(Date.now() - ctx.startTime);
metrics.pushCounter.inc({ queue: ctx.queue });
},
onPushError: (ctx, error) => {
metrics.errorCounter.inc({ queue: ctx.queue, code: error.code });
},
},
});
flashQ Documentation - flashq.dev