Features Blog Docs GitHub Get Started

Error Handling & Retry Patterns in flashQ

Build resilient job queues that gracefully handle failures. This guide covers retry strategies, exponential backoff, dead letter queues, circuit breakers, and more.

Understanding Error Types

Not all errors are equal. Understanding error types helps you choose the right handling strategy:

Error Type Examples Strategy
Transient Network timeout, 503 Service Unavailable, rate limit Retry with backoff
Permanent 400 Bad Request, invalid data, missing resource Don't retry, send to DLQ
Recoverable Database connection lost, external service down Retry with circuit breaker
Bug TypeError, null reference, logic errors Log, alert, fix code

Retry Strategies

Basic Retry Configuration

import { FlashQ } from 'flashq';

const client = new FlashQ({ host: 'localhost', port: 6789 });

// Configure retries when pushing a job
await client.push('process-order', orderData, {
  max_attempts: 5,    // Maximum retry attempts
  backoff: 1000,      // Initial backoff delay (ms)
  timeout: 30000      // Processing timeout per attempt
});

Retry Timeline Visualization

Attempt 1: Immediate
    ↓ (fail)
    └── Wait 1s (backoff)

Attempt 2: After 1s
    ↓ (fail)
    └── Wait 2s (backoff × 2)

Attempt 3: After 3s total
    ↓ (fail)
    └── Wait 4s (backoff × 4)

Attempt 4: After 7s total
    ↓ (fail)
    └── Wait 8s (backoff × 8)

Attempt 5: After 15s total
    ↓ (fail)
    └── Move to DLQ (max_attempts reached)

Custom Retry Logic in Worker

import { Worker } from 'flashq';

const worker = new Worker('process-order', async (job) => {
  try {
    await processOrder(job.data);
  } catch (error) {
    // Check if we should retry
    if (isTransientError(error) && job.attempts < 5) {
      throw error; // Throw to trigger automatic retry
    }

    // Permanent error - log and don't retry
    if (isPermanentError(error)) {
      console.error(`Permanent error for job ${job.id}:`, error);
      return { success: false, error: error.message };
    }

    throw error; // Unknown error - retry
  }
});

function isTransientError(error) {
  const transientCodes = [408, 429, 500, 502, 503, 504];
  return error.message.includes('ECONNREFUSED') ||
         error.message.includes('ETIMEDOUT') ||
         transientCodes.some(code => error.message.includes(String(code)));
}

function isPermanentError(error) {
  const permanentCodes = [400, 401, 403, 404, 422];
  return permanentCodes.some(code => error.message.includes(String(code)));
}

Exponential Backoff

flashQ implements exponential backoff automatically. The delay between retries doubles each time:

// Backoff formula: delay = backoff * (2 ^ (attempt - 1))

await client.push('api-call', data, {
  max_attempts: 6,
  backoff: 1000  // 1 second base
});

// Retry delays:
// Attempt 1 → 2: 1s
// Attempt 2 → 3: 2s
// Attempt 3 → 4: 4s
// Attempt 4 → 5: 8s
// Attempt 5 → 6: 16s
// Total wait time: 31 seconds

Backoff with Jitter

For high-concurrency scenarios, add jitter to prevent thundering herd:

const worker = new Worker('high-concurrency', async (job) => {
  try {
    await processJob(job.data);
  } catch (error) {
    if (isTransientError(error)) {
      // Add random jitter before throwing
      const jitter = Math.random() * 1000; // 0-1s random delay
      await sleep(jitter);
      throw error;
    }
    throw error;
  }
});

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

Dead Letter Queue (DLQ)

Jobs that fail all retries are moved to the Dead Letter Queue for later analysis or manual intervention.

Viewing DLQ Jobs

// Get jobs from DLQ
const dlqJobs = await client.getDlq('process-order', 100);

console.log(`${dlqJobs.length} jobs in DLQ`);

dlqJobs.forEach(job => {
  console.log({
    id: job.id,
    attempts: job.attempts,
    error: job.error,
    data: job.data,
    failedAt: job.failedAt
  });
});

Retrying DLQ Jobs

// Retry all jobs in DLQ
const retried = await client.retryDlq('process-order');
console.log(`Retried ${retried} jobs`);

// Retry specific job
await client.retryDlq('process-order', jobId);

// Selective retry based on error type
const dlqJobs = await client.getDlq('my-queue', 100);

for (const job of dlqJobs) {
  // Only retry rate limit errors (now that limit is lifted)
  if (job.error?.includes('429') || job.error?.includes('rate limit')) {
    await client.retryDlq('my-queue', job.id);
  }
}

DLQ Webhooks

// Get notified when jobs hit DLQ
await client.push('critical-task', data, {
  max_attempts: 3,
  webhook: {
    url: 'https://your-api.com/dlq-alert',
    events: ['failed'],  // Trigger on final failure
    secret: process.env.WEBHOOK_SECRET
  }
});

// Webhook handler
app.post('/dlq-alert', (req, res) => {
  const { jobId, queue, error, attempts } = req.body;

  // Send alert to Slack
  slack.send({
    channel: '#alerts',
    text: `Job ${jobId} failed after ${attempts} attempts: ${error}`
  });

  res.status(200).send('OK');
});

Circuit Breaker Pattern

Prevent cascading failures when external services are down:

import CircuitBreaker from 'opossum';
import { Worker } from 'flashq';

// Create circuit breaker for external API
const apiBreaker = new CircuitBreaker(callExternalAPI, {
  timeout: 10000,              // 10s timeout
  errorThresholdPercentage: 50, // Open after 50% failures
  resetTimeout: 30000,          // Try again after 30s
  volumeThreshold: 10           // Minimum 10 requests before tripping
});

// Event handlers for monitoring
apiBreaker.on('open', () => {
  console.warn('Circuit breaker OPENED - external API is down');
  alertOps('External API circuit breaker opened');
});

apiBreaker.on('close', () => {
  console.info('Circuit breaker CLOSED - API recovered');
});

// Worker with circuit breaker
const worker = new Worker('external-api', async (job) => {
  try {
    return await apiBreaker.fire(job.data);
  } catch (error) {
    if (apiBreaker.opened) {
      // Circuit is open - fail fast, retry later
      throw new Error('Circuit breaker open: service unavailable');
    }
    throw error;
  }
});

Circuit Breaker States

┌─────────┐    Failures exceed threshold    ┌─────────┐
│ CLOSED  │ ─────────────────────────────→ │  OPEN   │
│(normal) │                                 │ (fail   │
└────┬────┘                                 │  fast)  │
     │                                      └────┬────┘
     │                                           │
     │ Success                      Reset timeout expires
     │                                           │
     │         ┌─────────────┐                   │
     └─────────│  HALF-OPEN  │←──────────────────┘
               │   (test)    │
               └─────────────┘

Graceful Degradation

Return partial results when some operations fail:

const worker = new Worker('enrich-data', async (job) => {
  const results = {
    primary: null,
    enrichments: {}
  };

  // Primary operation - must succeed
  results.primary = await fetchPrimaryData(job.data.id);

  // Optional enrichments - fail gracefully
  const enrichmentTasks = [
    { name: 'social', fn: fetchSocialData },
    { name: 'analytics', fn: fetchAnalytics },
    { name: 'recommendations', fn: fetchRecommendations }
  ];

  await Promise.all(
    enrichmentTasks.map(async (task) => {
      try {
        results.enrichments[task.name] = await task.fn(job.data.id);
      } catch (error) {
        console.warn(`Enrichment ${task.name} failed:`, error.message);
        results.enrichments[task.name] = null; // Graceful degradation
      }
    })
  );

  return results;
});

Complete Error Handling Pattern

import { FlashQ, Worker } from 'flashq';
import CircuitBreaker from 'opossum';

const client = new FlashQ({ host: 'localhost', port: 6789 });

// Circuit breaker for external service
const breaker = new CircuitBreaker(callAPI, {
  timeout: 10000,
  errorThresholdPercentage: 50,
  resetTimeout: 30000
});

const worker = new Worker('resilient-queue', async (job) => {
  try {
    // 1. Validate input
    if (!job.data.userId) {
      await client.discard(job.id);
      return { error: 'Invalid data' };
    }

    // 2. Process with circuit breaker
    const result = await breaker.fire(job.data);

    // 3. Report progress for long jobs
    await client.progress(job.id, 100, 'Complete');

    return result;

  } catch (error) {
    // 4. Classify error
    if (isPermanentError(error)) {
      await client.discard(job.id);
      return { error: error.message };
    }

    if (breaker.opened) {
      console.log('Circuit open, will retry later');
    }

    throw error; // Retry for transient errors
  }
}, { concurrency: 10 });
Key Takeaways: Classify errors, use exponential backoff, implement circuit breakers, and always have a DLQ strategy for failed jobs.

Related Resources

Build Resilient Queues

Get started with flashQ and implement robust error handling.

Get Started →