Build resilient job queues that gracefully handle failures. This guide covers retry strategies, exponential backoff, dead letter queues, circuit breakers, and more.
Understanding Error Types
Not all errors are equal. Understanding error types helps you choose the right handling strategy:
| Error Type | Examples | Strategy |
|---|---|---|
| Transient | Network timeout, 503 Service Unavailable, rate limit | Retry with backoff |
| Permanent | 400 Bad Request, invalid data, missing resource | Don't retry, send to DLQ |
| Recoverable | Database connection lost, external service down | Retry with circuit breaker |
| Bug | TypeError, null reference, logic errors | Log, alert, fix code |
Retry Strategies
Basic Retry Configuration
import { FlashQ } from 'flashq';
const client = new FlashQ({ host: 'localhost', port: 6789 });
// Configure retries when pushing a job
await client.push('process-order', orderData, {
max_attempts: 5, // Maximum retry attempts
backoff: 1000, // Initial backoff delay (ms)
timeout: 30000 // Processing timeout per attempt
});
Retry Timeline Visualization
Attempt 1: Immediate
↓ (fail)
└── Wait 1s (backoff)
Attempt 2: After 1s
↓ (fail)
└── Wait 2s (backoff × 2)
Attempt 3: After 3s total
↓ (fail)
└── Wait 4s (backoff × 4)
Attempt 4: After 7s total
↓ (fail)
└── Wait 8s (backoff × 8)
Attempt 5: After 15s total
↓ (fail)
└── Move to DLQ (max_attempts reached)
Custom Retry Logic in Worker
import { Worker } from 'flashq';
const worker = new Worker('process-order', async (job) => {
try {
await processOrder(job.data);
} catch (error) {
// Check if we should retry
if (isTransientError(error) && job.attempts < 5) {
throw error; // Throw to trigger automatic retry
}
// Permanent error - log and don't retry
if (isPermanentError(error)) {
console.error(`Permanent error for job ${job.id}:`, error);
return { success: false, error: error.message };
}
throw error; // Unknown error - retry
}
});
function isTransientError(error) {
const transientCodes = [408, 429, 500, 502, 503, 504];
return error.message.includes('ECONNREFUSED') ||
error.message.includes('ETIMEDOUT') ||
transientCodes.some(code => error.message.includes(String(code)));
}
function isPermanentError(error) {
const permanentCodes = [400, 401, 403, 404, 422];
return permanentCodes.some(code => error.message.includes(String(code)));
}
Exponential Backoff
flashQ implements exponential backoff automatically. The delay between retries doubles each time:
// Backoff formula: delay = backoff * (2 ^ (attempt - 1))
await client.push('api-call', data, {
max_attempts: 6,
backoff: 1000 // 1 second base
});
// Retry delays:
// Attempt 1 → 2: 1s
// Attempt 2 → 3: 2s
// Attempt 3 → 4: 4s
// Attempt 4 → 5: 8s
// Attempt 5 → 6: 16s
// Total wait time: 31 seconds
Backoff with Jitter
For high-concurrency scenarios, add jitter to prevent thundering herd:
const worker = new Worker('high-concurrency', async (job) => {
try {
await processJob(job.data);
} catch (error) {
if (isTransientError(error)) {
// Add random jitter before throwing
const jitter = Math.random() * 1000; // 0-1s random delay
await sleep(jitter);
throw error;
}
throw error;
}
});
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
Dead Letter Queue (DLQ)
Jobs that fail all retries are moved to the Dead Letter Queue for later analysis or manual intervention.
Viewing DLQ Jobs
// Get jobs from DLQ
const dlqJobs = await client.getDlq('process-order', 100);
console.log(`${dlqJobs.length} jobs in DLQ`);
dlqJobs.forEach(job => {
console.log({
id: job.id,
attempts: job.attempts,
error: job.error,
data: job.data,
failedAt: job.failedAt
});
});
Retrying DLQ Jobs
// Retry all jobs in DLQ
const retried = await client.retryDlq('process-order');
console.log(`Retried ${retried} jobs`);
// Retry specific job
await client.retryDlq('process-order', jobId);
// Selective retry based on error type
const dlqJobs = await client.getDlq('my-queue', 100);
for (const job of dlqJobs) {
// Only retry rate limit errors (now that limit is lifted)
if (job.error?.includes('429') || job.error?.includes('rate limit')) {
await client.retryDlq('my-queue', job.id);
}
}
DLQ Webhooks
// Get notified when jobs hit DLQ
await client.push('critical-task', data, {
max_attempts: 3,
webhook: {
url: 'https://your-api.com/dlq-alert',
events: ['failed'], // Trigger on final failure
secret: process.env.WEBHOOK_SECRET
}
});
// Webhook handler
app.post('/dlq-alert', (req, res) => {
const { jobId, queue, error, attempts } = req.body;
// Send alert to Slack
slack.send({
channel: '#alerts',
text: `Job ${jobId} failed after ${attempts} attempts: ${error}`
});
res.status(200).send('OK');
});
Circuit Breaker Pattern
Prevent cascading failures when external services are down:
import CircuitBreaker from 'opossum';
import { Worker } from 'flashq';
// Create circuit breaker for external API
const apiBreaker = new CircuitBreaker(callExternalAPI, {
timeout: 10000, // 10s timeout
errorThresholdPercentage: 50, // Open after 50% failures
resetTimeout: 30000, // Try again after 30s
volumeThreshold: 10 // Minimum 10 requests before tripping
});
// Event handlers for monitoring
apiBreaker.on('open', () => {
console.warn('Circuit breaker OPENED - external API is down');
alertOps('External API circuit breaker opened');
});
apiBreaker.on('close', () => {
console.info('Circuit breaker CLOSED - API recovered');
});
// Worker with circuit breaker
const worker = new Worker('external-api', async (job) => {
try {
return await apiBreaker.fire(job.data);
} catch (error) {
if (apiBreaker.opened) {
// Circuit is open - fail fast, retry later
throw new Error('Circuit breaker open: service unavailable');
}
throw error;
}
});
Circuit Breaker States
┌─────────┐ Failures exceed threshold ┌─────────┐
│ CLOSED │ ─────────────────────────────→ │ OPEN │
│(normal) │ │ (fail │
└────┬────┘ │ fast) │
│ └────┬────┘
│ │
│ Success Reset timeout expires
│ │
│ ┌─────────────┐ │
└─────────│ HALF-OPEN │←──────────────────┘
│ (test) │
└─────────────┘
Graceful Degradation
Return partial results when some operations fail:
const worker = new Worker('enrich-data', async (job) => {
const results = {
primary: null,
enrichments: {}
};
// Primary operation - must succeed
results.primary = await fetchPrimaryData(job.data.id);
// Optional enrichments - fail gracefully
const enrichmentTasks = [
{ name: 'social', fn: fetchSocialData },
{ name: 'analytics', fn: fetchAnalytics },
{ name: 'recommendations', fn: fetchRecommendations }
];
await Promise.all(
enrichmentTasks.map(async (task) => {
try {
results.enrichments[task.name] = await task.fn(job.data.id);
} catch (error) {
console.warn(`Enrichment ${task.name} failed:`, error.message);
results.enrichments[task.name] = null; // Graceful degradation
}
})
);
return results;
});
Complete Error Handling Pattern
import { FlashQ, Worker } from 'flashq';
import CircuitBreaker from 'opossum';
const client = new FlashQ({ host: 'localhost', port: 6789 });
// Circuit breaker for external service
const breaker = new CircuitBreaker(callAPI, {
timeout: 10000,
errorThresholdPercentage: 50,
resetTimeout: 30000
});
const worker = new Worker('resilient-queue', async (job) => {
try {
// 1. Validate input
if (!job.data.userId) {
await client.discard(job.id);
return { error: 'Invalid data' };
}
// 2. Process with circuit breaker
const result = await breaker.fire(job.data);
// 3. Report progress for long jobs
await client.progress(job.id, 100, 'Complete');
return result;
} catch (error) {
// 4. Classify error
if (isPermanentError(error)) {
await client.discard(job.id);
return { error: error.message };
}
if (breaker.opened) {
console.log('Circuit open, will retry later');
}
throw error; // Retry for transient errors
}
}, { concurrency: 10 });
Key Takeaways: Classify errors, use exponential backoff, implement circuit breakers, and always have a DLQ strategy for failed jobs.