Features Blog Docs GitHub Get Started

Building AI Pipelines with flashQ: A Complete Guide

AI applications are rarely simple request-response systems. They involve complex pipelines: generating embeddings, searching vector databases, calling LLMs, post-processing results, and more. Each step can fail, needs retry logic, and often takes seconds to complete.

This is where job queues shine. In this guide, we'll build three real-world AI pipelines using flashQ:

  1. RAG Pipeline: Embed β†’ Search β†’ Generate
  2. Document Processing: Parse β†’ Chunk β†’ Embed β†’ Store
  3. Batch Inference: Process millions of predictions

Why Use a Job Queue for AI?

Before diving into code, let's understand why job queues are essential for AI applications:

  • Rate Limiting: OpenAI, Anthropic, and other providers have strict rate limits. A queue lets you control throughput.
  • Reliability: API calls fail. Queues provide automatic retries with exponential backoff.
  • Cost Control: Track costs per job, pause queues when budgets are exceeded.
  • Long-Running Jobs: LLM calls can take 30+ seconds. Don't block your web server.
  • Orchestration: Chain multiple steps with dependencies.

Setup

First, let's set up flashQ:

# Install flashQ SDK
npm install flashq

# Start the server
docker run -d -p 6789:6789 flashq/flashq

And install the AI SDKs we'll use:

npm install openai @anthropic-ai/sdk

Pipeline 1: RAG (Retrieval-Augmented Generation)

RAG is the most common AI pattern. It involves:

  1. Converting a query to an embedding
  2. Searching a vector database for similar documents
  3. Generating a response using the retrieved context
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Embed  β”‚ ──▢ β”‚ Search  β”‚ ──▢ β”‚ Generate β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

With flashQ, we can model this as a job flow with dependencies:

import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();
const queue = new Queue('rag-pipeline');

// Step 1: Embed the query
const embedJob = await queue.add('embed', {
  text: 'What is the capital of France?',
  model: 'text-embedding-3-small'
});

// Step 2: Search (depends on embed)
const searchJob = await queue.add('search', {
  collection: 'knowledge-base',
  topK: 5
}, {
  depends_on: [embedJob.id]
});

// Step 3: Generate (depends on search)
const generateJob = await queue.add('generate', {
  model: 'gpt-4',
  systemPrompt: 'Answer based on the provided context.'
}, {
  depends_on: [searchJob.id]
});

// Wait for the final result
const result = await queue.finished(generateJob.id);
console.log(result); // "The capital of France is Paris."

Now let's implement the workers:

// Embed Worker
new Worker('rag-pipeline', async (job) => {
  if (job.name !== 'embed') return;

  const { text, model } = job.data;

  const response = await openai.embeddings.create({
    input: text,
    model: model
  });

  return {
    embedding: response.data[0].embedding,
    originalText: text
  };
});

// Search Worker
new Worker('rag-pipeline', async (job) => {
  if (job.name !== 'search') return;

  // Get embedding from parent job
  const embedResult = await queue.getResult(job.data.parentId);

  // Search vector database (using your preferred DB)
  const results = await vectorDB.search({
    vector: embedResult.embedding,
    topK: job.data.topK
  });

  return {
    documents: results,
    query: embedResult.originalText
  };
});

// Generate Worker
new Worker('rag-pipeline', async (job) => {
  if (job.name !== 'generate') return;

  const searchResult = await queue.getResult(job.data.parentId);

  const context = searchResult.documents
    .map(d => d.content)
    .join('\n\n');

  const response = await openai.chat.completions.create({
    model: job.data.model,
    messages: [
      { role: 'system', content: job.data.systemPrompt },
      { role: 'user', content: `Context:\n${context}\n\nQuestion: ${searchResult.query}` }
    ]
  });

  return response.choices[0].message.content;
});

Pipeline 2: Document Processing

Processing documents for a knowledge base involves multiple steps:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”
β”‚ Parse β”‚ ──▢ β”‚ Chunk β”‚ ──▢ β”‚ Embed β”‚ ──▢ β”‚ Store β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”˜

Here's how to implement it with rate limiting to avoid hitting API quotas:

const queue = new Queue('document-processing');

// Set rate limit: 100 requests per minute for embeddings
await queue.setRateLimit(100);

// Process a document
async function processDocument(documentUrl) {
  // Step 1: Parse
  const parseJob = await queue.add('parse', {
    url: documentUrl
  });

  // Step 2: Chunk (depends on parse)
  const chunkJob = await queue.add('chunk', {
    chunkSize: 500,
    overlap: 50
  }, {
    depends_on: [parseJob.id]
  });

  // Wait for chunks to be ready
  const chunks = await queue.finished(chunkJob.id);

  // Step 3: Embed each chunk (rate limited!)
  const embedJobs = await Promise.all(
    chunks.map((chunk, i) =>
      queue.add('embed-chunk', {
        text: chunk.text,
        metadata: chunk.metadata,
        index: i
      })
    )
  );

  // Step 4: Store (depends on all embeddings)
  const storeJob = await queue.add('store', {
    documentUrl,
    totalChunks: chunks.length
  }, {
    depends_on: embedJobs.map(j => j.id)
  });

  return queue.finished(storeJob.id);
}

Pipeline 3: Batch Inference

For batch processing millions of items, flashQ's high throughput really shines:

const queue = new Queue('batch-inference');

// Process 1 million items
async function batchProcess(items) {
  console.log(`Processing ${items.length} items...`);

  // Push all jobs in batches of 1000
  const batchSize = 1000;
  const jobIds = [];

  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);

    // Batch push for maximum throughput
    const jobs = await queue.addBulk(
      batch.map(item => ({
        name: 'predict',
        data: item
      }))
    );

    jobIds.push(...jobs.map(j => j.id));

    // Report progress
    console.log(`Queued ${Math.min(i + batchSize, items.length)} / ${items.length}`);
  }

  console.log('All jobs queued. Processing...');
}

// Worker with concurrency control
new Worker('batch-inference', async (job) => {
  const prediction = await model.predict(job.data);

  // Update progress
  await job.updateProgress(100);

  return prediction;
}, {
  concurrency: 10 // Process 10 jobs in parallel
});

Handling Failures

AI APIs are unreliable. Here's how to handle failures gracefully:

// Configure retries with exponential backoff
await queue.add('llm-call', { prompt }, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000  // 1s, 2s, 4s, 8s, 16s
  }
});

// Handle specific errors in worker
new Worker('llm-call', async (job) => {
  try {
    return await openai.chat.completions.create({...});
  } catch (error) {
    if (error.status === 429) {
      // Rate limited - throw to retry
      throw new Error('Rate limited, will retry');
    }
    if (error.status === 400) {
      // Bad request - don't retry
      return { error: 'Invalid request', skip: true };
    }
    throw error;
  }
});

// Monitor the dead letter queue
const failedJobs = await queue.getDlq(100);
console.log(`${failedJobs.length} jobs in DLQ`);

// Retry failed jobs
await queue.retryDlq();

Monitoring and Observability

Track your pipelines in real-time:

// Get queue stats
const stats = await queue.getJobCounts();
console.log(stats);
// {
//   waiting: 1523,
//   active: 10,
//   completed: 45230,
//   failed: 12,
//   delayed: 0
// }

// Track progress of a specific job
const progress = await queue.getProgress(jobId);
console.log(`Job ${jobId}: ${progress.percent}% - ${progress.message}`);

// Listen to events
queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed`);
  metrics.increment('jobs.completed');
});

queue.on('failed', (job, error) => {
  console.error(`Job ${job.id} failed: ${error.message}`);
  metrics.increment('jobs.failed');
});
πŸ’‘ Pro Tip

flashQ exposes Prometheus metrics at /metrics/prometheus when running with HTTP enabled. Use Grafana to visualize queue health.

Best Practices

1. Use Job Dependencies for Complex Pipelines

Instead of polling for job completion, use depends_on to chain jobs. This is more efficient and lets flashQ optimize scheduling.

2. Set Appropriate Timeouts

LLM calls can be slow. Set timeouts that match your use case:

await queue.add('generate', data, {
  timeout: 60000  // 60 seconds for LLM calls
});

3. Use Rate Limiting to Control Costs

Rate limiting isn't just for API complianceβ€”it's for budget control:

// Limit to $10/hour in API calls
// GPT-4 β‰ˆ $0.03 per 1K tokens β‰ˆ 333 calls per $10
await queue.setRateLimit(333); // 333 per hour = ~6 per minute

4. Track Costs Per Job

Store token usage in job results for cost tracking:

return {
  result: response.choices[0].message.content,
  usage: {
    promptTokens: response.usage.prompt_tokens,
    completionTokens: response.usage.completion_tokens,
    cost: calculateCost(response.usage)
  }
};

5. Use Concurrency Control

Match concurrency to your API limits and system resources:

new Worker('embeddings', processor, {
  concurrency: 5  // 5 parallel embedding calls
});

// Or set at queue level
await queue.setConcurrency(10);

Conclusion

Building reliable AI pipelines doesn't have to be complicated. With flashQ, you get:

  • Job dependencies for complex workflows
  • Rate limiting to control API costs
  • Automatic retries for transient failures
  • Progress tracking for long-running jobs
  • High throughput for batch processing

All without managing Redis or any external infrastructure.

Ready to build your own AI pipeline? Check out the documentation for the full API reference and more examples.

Start Building

Get flashQ running in 5 minutes and start building AI pipelines.

Get Started β†’
ESC