Features Blog Docs GitHub Get Started

Job Queue Patterns for AI Applications

Building reliable AI applications requires more than just calling APIs. You need patterns that handle failures gracefully, scale efficiently, and maintain data consistency. In this guide, we'll explore essential job queue patterns that every AI engineer should know.

1. Fan-Out Pattern

The fan-out pattern distributes work across multiple workers. It's perfect for batch processing where you need to process many items independently.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Source  β”‚ ──▢ β”‚ Worker 1 β”‚ ──▢ β”‚ Results  β”‚
β”‚  Job    β”‚ ──▢ β”‚ Worker 2 β”‚ ──▢ β”‚ Collectorβ”‚
β”‚         β”‚ ──▢ β”‚ Worker 3 β”‚ ──▢ β”‚          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Use case: Processing 10,000 documents for embedding generation.

async function fanOutEmbeddings(documents) {
  const queue = new Queue('embeddings');

  // Fan out: Create a job for each document
  const jobs = await queue.addBulk(
    documents.map(doc => ({
      name: 'embed',
      data: { id: doc.id, text: doc.content }
    }))
  );

  // Fan in: Collect results
  const collector = await queue.add('collect', {
    jobIds: jobs.map(j => j.id),
    totalCount: documents.length
  }, {
    depends_on: jobs.map(j => j.id)
  });

  return queue.finished(collector.id);
}

2. Saga Pattern

The saga pattern manages distributed transactions by breaking them into steps, each with a compensating action for rollback.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”
β”‚Step 1 β”‚ ──▢ β”‚Step 2 β”‚ ──▢ β”‚Step 3 β”‚
β”‚       β”‚     β”‚       β”‚     β”‚       β”‚
β”‚Comp 1 β”‚ ◀── β”‚Comp 2 β”‚ ◀── β”‚Comp 3 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜
  (rollback chain if any step fails)

Use case: AI-powered document processing with external API calls.

const saga = {
  steps: [
    {
      name: 'extract',
      execute: async (data) => await extractText(data.file),
      compensate: async (data) => await deleteExtraction(data.extractionId)
    },
    {
      name: 'analyze',
      execute: async (data) => await openai.chat.completions.create({...}),
      compensate: async (data) => { /* no-op, API call can't be undone */ }
    },
    {
      name: 'store',
      execute: async (data) => await db.insert(data.analysis),
      compensate: async (data) => await db.delete(data.recordId)
    }
  ]
};

// Execute saga with automatic rollback on failure
async function executeSaga(saga, initialData) {
  const completedSteps = [];

  try {
    let data = initialData;
    for (const step of saga.steps) {
      data = await step.execute(data);
      completedSteps.push({ step, data });
    }
    return data;
  } catch (error) {
    // Rollback in reverse order
    for (const { step, data } of completedSteps.reverse()) {
      await step.compensate(data);
    }
    throw error;
  }
}

3. Circuit Breaker Pattern

The circuit breaker prevents cascading failures by stopping requests to a failing service.

        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚         Circuit Breaker             β”‚
        β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
        β”‚  β”‚Closed β”‚β†’β”‚ Open  β”‚β†’β”‚Half-Open  β”‚ β”‚
        β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Use case: Protecting against OpenAI API outages.

class CircuitBreaker {
  constructor(threshold = 5, timeout = 60000) {
    this.failures = 0;
    this.threshold = threshold;
    this.timeout = timeout;
    this.state = 'CLOSED';
    this.nextAttempt = 0;
  }

  async execute(fn) {
    if (this.state === 'OPEN') {
      if (Date.now() < this.nextAttempt) {
        throw new Error('Circuit breaker is OPEN');
      }
      this.state = 'HALF-OPEN';
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  onSuccess() {
    this.failures = 0;
    this.state = 'CLOSED';
  }

  onFailure() {
    this.failures++;
    if (this.failures >= this.threshold) {
      this.state = 'OPEN';
      this.nextAttempt = Date.now() + this.timeout;
    }
  }
}

// Usage in worker
const breaker = new CircuitBreaker();

new Worker('openai-calls', async (job) => {
  return breaker.execute(async () => {
    return openai.chat.completions.create(job.data);
  });
});

4. Priority Queue Pattern

Process important jobs first while still handling lower-priority work.

// Priority levels
const PRIORITY = {
  CRITICAL: 100,  // Processed immediately
  HIGH: 50,       // Next in line
  NORMAL: 10,     // Default
  LOW: 1          // When nothing else is pending
};

// Real-time user request (high priority)
await queue.add('generate', { prompt, userId }, {
  priority: PRIORITY.HIGH
});

// Background batch job (low priority)
await queue.add('generate', { prompt, batchId }, {
  priority: PRIORITY.LOW
});

5. Retry with Exponential Backoff

Handle transient failures by retrying with increasing delays.

// Retry delays: 1s, 2s, 4s, 8s, 16s
await queue.add('api-call', data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000
  }
});

// Custom backoff with jitter (prevents thundering herd)
function calculateBackoff(attempt, baseDelay = 1000) {
  const exponentialDelay = baseDelay * Math.pow(2, attempt);
  const jitter = Math.random() * 1000;
  return exponentialDelay + jitter;
}

6. Dead Letter Queue (DLQ) Pattern

Capture failed jobs for investigation and reprocessing.

// Jobs move to DLQ after max attempts
await queue.add('process', data, {
  attempts: 3
});

// Monitor DLQ
const failedJobs = await queue.getDlq(100);

for (const job of failedJobs) {
  console.log(`Failed job ${job.id}:`, job.failedReason);

  // Decide action based on error
  if (job.failedReason.includes('rate limit')) {
    // Retry rate-limited jobs
    await queue.retryDlq(job.id);
  } else if (job.failedReason.includes('invalid')) {
    // Log and skip invalid jobs
    await logToSlack(`Invalid job: ${job.id}`);
  }
}

// Alert on DLQ growth
if (failedJobs.length > 100) {
  await sendAlert('DLQ has over 100 jobs!');
}

7. Idempotency Pattern

Ensure jobs can be safely retried without side effects.

// Use custom job ID for idempotency
await queue.add('charge', { userId, amount }, {
  jobId: `charge-${userId}-${orderId}`  // Unique per operation
});

// Worker checks idempotency
new Worker('payments', async (job) => {
  // Check if already processed
  const existing = await db.payments.findOne({
    idempotencyKey: job.id
  });

  if (existing) {
    return existing.result; // Return cached result
  }

  // Process and store with idempotency key
  const result = await processPayment(job.data);
  await db.payments.insert({
    idempotencyKey: job.id,
    result
  });

  return result;
});

8. Scheduled Job Pattern

Run jobs at specific times using cron expressions.

// Daily report at 9 AM
await queue.addCron('daily-report', {
  queue: 'reports',
  schedule: '0 9 * * *',
  data: { type: 'daily' }
});

// Hourly model retraining
await queue.addCron('retrain-model', {
  queue: 'ml-training',
  schedule: '0 * * * *',
  data: { modelId: 'recommendations' }
});

// Every 5 minutes - cache refresh
await queue.addCron('refresh-cache', {
  queue: 'maintenance',
  schedule: '*/5 * * * *',
  data: { cache: 'embeddings' }
});

9. Batch Processing Pattern

Group multiple items into a single job for efficiency.

// Instead of 1000 individual jobs...
const BATCH_SIZE = 100;
const batches = [];

for (let i = 0; i < items.length; i += BATCH_SIZE) {
  batches.push(items.slice(i, i + BATCH_SIZE));
}

// Create 10 batch jobs instead
for (const batch of batches) {
  await queue.add('process-batch', {
    items: batch
  });
}

// Worker processes batch efficiently
new Worker('embeddings', async (job) => {
  const { items } = job.data;

  // OpenAI supports batch embeddings
  const response = await openai.embeddings.create({
    input: items.map(i => i.text),
    model: 'text-embedding-3-small'
  });

  return response.data;
});

Choosing the Right Pattern

Scenario Pattern
Processing many items independently Fan-Out
Multi-step workflow with rollback Saga
Unstable external API Circuit Breaker
User-facing vs batch jobs Priority Queue
Transient failures Exponential Backoff
Debugging failures Dead Letter Queue
Payment/mutation safety Idempotency
Recurring tasks Scheduled Jobs
High-volume processing Batch Processing

Conclusion

These patterns form the foundation of reliable AI applications. Start with the basics (retries, DLQ) and add complexity (sagas, circuit breakers) as your system grows. flashQ supports all these patterns out of the box.

Build Reliable AI Apps

Get started with flashQ and implement these patterns today.

Read the Docs β†’