Building reliable AI applications requires more than just calling APIs. You need patterns that handle failures gracefully, scale efficiently, and maintain data consistency. In this guide, we'll explore essential job queue patterns that every AI engineer should know.
1. Fan-Out Pattern
The fan-out pattern distributes work across multiple workers. It's perfect for batch processing where you need to process many items independently.
βββββββββββ ββββββββββββ ββββββββββββ
β Source β βββΆ β Worker 1 β βββΆ β Results β
β Job β βββΆ β Worker 2 β βββΆ β Collectorβ
β β βββΆ β Worker 3 β βββΆ β β
βββββββββββ ββββββββββββ ββββββββββββ
Use case: Processing 10,000 documents for embedding generation.
async function fanOutEmbeddings(documents) {
const queue = new Queue('embeddings');
// Fan out: Create a job for each document
const jobs = await queue.addBulk(
documents.map(doc => ({
name: 'embed',
data: { id: doc.id, text: doc.content }
}))
);
// Fan in: Collect results
const collector = await queue.add('collect', {
jobIds: jobs.map(j => j.id),
totalCount: documents.length
}, {
depends_on: jobs.map(j => j.id)
});
return queue.finished(collector.id);
}
2. Saga Pattern
The saga pattern manages distributed transactions by breaking them into steps, each with a compensating action for rollback.
βββββββββ βββββββββ βββββββββ
βStep 1 β βββΆ βStep 2 β βββΆ βStep 3 β
β β β β β β
βComp 1 β βββ βComp 2 β βββ βComp 3 β
βββββββββ βββββββββ βββββββββ
(rollback chain if any step fails)
Use case: AI-powered document processing with external API calls.
const saga = {
steps: [
{
name: 'extract',
execute: async (data) => await extractText(data.file),
compensate: async (data) => await deleteExtraction(data.extractionId)
},
{
name: 'analyze',
execute: async (data) => await openai.chat.completions.create({...}),
compensate: async (data) => { /* no-op, API call can't be undone */ }
},
{
name: 'store',
execute: async (data) => await db.insert(data.analysis),
compensate: async (data) => await db.delete(data.recordId)
}
]
};
// Execute saga with automatic rollback on failure
async function executeSaga(saga, initialData) {
const completedSteps = [];
try {
let data = initialData;
for (const step of saga.steps) {
data = await step.execute(data);
completedSteps.push({ step, data });
}
return data;
} catch (error) {
// Rollback in reverse order
for (const { step, data } of completedSteps.reverse()) {
await step.compensate(data);
}
throw error;
}
}
3. Circuit Breaker Pattern
The circuit breaker prevents cascading failures by stopping requests to a failing service.
βββββββββββββββββββββββββββββββββββββββ
β Circuit Breaker β
β βββββββββ βββββββββ βββββββββββββ β
β βClosed βββ Open βββHalf-Open β β
β βββββββββ βββββββββ βββββββββββββ β
βββββββββββββββββββββββββββββββββββββββ
Use case: Protecting against OpenAI API outages.
class CircuitBreaker {
constructor(threshold = 5, timeout = 60000) {
this.failures = 0;
this.threshold = threshold;
this.timeout = timeout;
this.state = 'CLOSED';
this.nextAttempt = 0;
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF-OPEN';
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failures = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failures++;
if (this.failures >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
}
}
}
// Usage in worker
const breaker = new CircuitBreaker();
new Worker('openai-calls', async (job) => {
return breaker.execute(async () => {
return openai.chat.completions.create(job.data);
});
});
4. Priority Queue Pattern
Process important jobs first while still handling lower-priority work.
// Priority levels
const PRIORITY = {
CRITICAL: 100, // Processed immediately
HIGH: 50, // Next in line
NORMAL: 10, // Default
LOW: 1 // When nothing else is pending
};
// Real-time user request (high priority)
await queue.add('generate', { prompt, userId }, {
priority: PRIORITY.HIGH
});
// Background batch job (low priority)
await queue.add('generate', { prompt, batchId }, {
priority: PRIORITY.LOW
});
5. Retry with Exponential Backoff
Handle transient failures by retrying with increasing delays.
// Retry delays: 1s, 2s, 4s, 8s, 16s
await queue.add('api-call', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000
}
});
// Custom backoff with jitter (prevents thundering herd)
function calculateBackoff(attempt, baseDelay = 1000) {
const exponentialDelay = baseDelay * Math.pow(2, attempt);
const jitter = Math.random() * 1000;
return exponentialDelay + jitter;
}
6. Dead Letter Queue (DLQ) Pattern
Capture failed jobs for investigation and reprocessing.
// Jobs move to DLQ after max attempts
await queue.add('process', data, {
attempts: 3
});
// Monitor DLQ
const failedJobs = await queue.getDlq(100);
for (const job of failedJobs) {
console.log(`Failed job ${job.id}:`, job.failedReason);
// Decide action based on error
if (job.failedReason.includes('rate limit')) {
// Retry rate-limited jobs
await queue.retryDlq(job.id);
} else if (job.failedReason.includes('invalid')) {
// Log and skip invalid jobs
await logToSlack(`Invalid job: ${job.id}`);
}
}
// Alert on DLQ growth
if (failedJobs.length > 100) {
await sendAlert('DLQ has over 100 jobs!');
}
7. Idempotency Pattern
Ensure jobs can be safely retried without side effects.
// Use custom job ID for idempotency
await queue.add('charge', { userId, amount }, {
jobId: `charge-${userId}-${orderId}` // Unique per operation
});
// Worker checks idempotency
new Worker('payments', async (job) => {
// Check if already processed
const existing = await db.payments.findOne({
idempotencyKey: job.id
});
if (existing) {
return existing.result; // Return cached result
}
// Process and store with idempotency key
const result = await processPayment(job.data);
await db.payments.insert({
idempotencyKey: job.id,
result
});
return result;
});
8. Scheduled Job Pattern
Run jobs at specific times using cron expressions.
// Daily report at 9 AM
await queue.addCron('daily-report', {
queue: 'reports',
schedule: '0 9 * * *',
data: { type: 'daily' }
});
// Hourly model retraining
await queue.addCron('retrain-model', {
queue: 'ml-training',
schedule: '0 * * * *',
data: { modelId: 'recommendations' }
});
// Every 5 minutes - cache refresh
await queue.addCron('refresh-cache', {
queue: 'maintenance',
schedule: '*/5 * * * *',
data: { cache: 'embeddings' }
});
9. Batch Processing Pattern
Group multiple items into a single job for efficiency.
// Instead of 1000 individual jobs...
const BATCH_SIZE = 100;
const batches = [];
for (let i = 0; i < items.length; i += BATCH_SIZE) {
batches.push(items.slice(i, i + BATCH_SIZE));
}
// Create 10 batch jobs instead
for (const batch of batches) {
await queue.add('process-batch', {
items: batch
});
}
// Worker processes batch efficiently
new Worker('embeddings', async (job) => {
const { items } = job.data;
// OpenAI supports batch embeddings
const response = await openai.embeddings.create({
input: items.map(i => i.text),
model: 'text-embedding-3-small'
});
return response.data;
});
Choosing the Right Pattern
| Scenario | Pattern |
|---|---|
| Processing many items independently | Fan-Out |
| Multi-step workflow with rollback | Saga |
| Unstable external API | Circuit Breaker |
| User-facing vs batch jobs | Priority Queue |
| Transient failures | Exponential Backoff |
| Debugging failures | Dead Letter Queue |
| Payment/mutation safety | Idempotency |
| Recurring tasks | Scheduled Jobs |
| High-volume processing | Batch Processing |
Conclusion
These patterns form the foundation of reliable AI applications. Start with the basics (retries, DLQ) and add complexity (sagas, circuit breakers) as your system grows. flashQ supports all these patterns out of the box.
Build Reliable AI Apps
Get started with flashQ and implement these patterns today.
Read the Docs β