Features Blog Docs GitHub Get Started

Multi-Stage LLM Pipelines: From Prompt to Response

Modern AI applications rarely call a single LLM. They orchestrate multiple steps: embed a query, search a knowledge base, format context, and generate a response. Each step depends on the previous one.

flashQ's job dependencies let you model these pipelines naturally. Jobs wait for their dependencies to complete before executing.

Pipeline Overview

A typical RAG pipeline has three stages:

┌─────────┐     ┌─────────┐     ┌──────────┐
│  Embed  │ ──▶ │ Search  │ ──▶ │ Generate │
└─────────┘     └─────────┘     └──────────┘
  1. Embed: Convert the user query into a vector
  2. Search: Find relevant documents in the vector database
  3. Generate: Use the documents as context for the LLM

Creating the Pipeline

Use dependsOn to chain jobs together:

import { Queue } from 'flashq';

const queue = new Queue('ai-pipeline');

async function runRAGPipeline(question: string) {
  // Stage 1: Embed the query
  const embedJob = await queue.add('embed', {
    text: question
  });

  // Stage 2: Search (waits for embedding)
  const searchJob = await queue.add('search', {
    topK: 5
  }, {
    dependsOn: [embedJob.id]
  });

  // Stage 3: Generate (waits for search)
  const generateJob = await queue.add('generate', {
    model: 'gpt-4',
    systemPrompt: 'Answer based on the provided context.'
  }, {
    dependsOn: [searchJob.id]
  });

  // Wait for the final result
  const response = await queue.finished(generateJob.id);
  return response;
}

When you call queue.finished(), flashQ waits for the entire pipeline to complete and returns the final result.

Setting Up the Worker

A single worker handles all job types using a switch on job.name:

import { Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();

new Worker('ai-pipeline', async (job) => {
  switch (job.name) {
    case 'embed':
      return handleEmbed(job);
    case 'search':
      return handleSearch(job);
    case 'generate':
      return handleGenerate(job);
  }
});

Embed Handler

async function handleEmbed(job) {
  const { text } = job.data;

  const response = await openai.embeddings.create({
    input: text,
    model: 'text-embedding-3-small'
  });

  return {
    embedding: response.data[0].embedding,
    originalText: text
  };
}

Search Handler

async function handleSearch(job) {
  // Get result from parent job (embed)
  const parentResult = job.parentResult;

  const results = await vectorDB.search({
    vector: parentResult.embedding,
    topK: job.data.topK
  });

  return {
    documents: results,
    query: parentResult.originalText
  };
}

Generate Handler

async function handleGenerate(job) {
  const parentResult = job.parentResult;

  const context = parentResult.documents
    .map(doc => doc.content)
    .join('\n\n');

  const response = await openai.chat.completions.create({
    model: job.data.model,
    messages: [
      { role: 'system', content: job.data.systemPrompt },
      {
        role: 'user',
        content: `Context:\n${context}\n\nQuestion: ${parentResult.query}`
      }
    ]
  });

  return response.choices[0].message.content;
}

Use Case: Document Processing

Another common pipeline is document ingestion:

┌───────┐     ┌───────┐     ┌───────┐     ┌───────┐
│ Parse │ ──▶ │ Chunk │ ──▶ │ Embed │ ──▶ │ Store │
└───────┘     └───────┘     └───────┘     └───────┘
async function processDocument(url: string) {
  // Parse the document
  const parseJob = await queue.add('parse', { url });

  // Chunk into smaller pieces
  const chunkJob = await queue.add('chunk', {
    chunkSize: 500,
    overlap: 50
  }, {
    dependsOn: [parseJob.id]
  });

  // Embed all chunks
  const embedJob = await queue.add('embed-chunks', {}, {
    dependsOn: [chunkJob.id]
  });

  // Store in vector database
  const storeJob = await queue.add('store', { url }, {
    dependsOn: [embedJob.id]
  });

  return queue.finished(storeJob.id);
}

Use Case: Content Generation

Generate and refine content with multiple LLM calls:

async function generateBlogPost(topic: string) {
  // Generate outline
  const outlineJob = await queue.add('generate-outline', {
    topic,
    model: 'gpt-4'
  });

  // Generate draft based on outline
  const draftJob = await queue.add('generate-draft', {
    model: 'gpt-4'
  }, {
    dependsOn: [outlineJob.id]
  });

  // Edit and improve the draft
  const editJob = await queue.add('edit-draft', {
    model: 'gpt-4'
  }, {
    dependsOn: [draftJob.id]
  });

  return queue.finished(editJob.id);
}

Error Handling

When a job fails, dependent jobs won't execute. Configure retries:

const embedJob = await queue.add('embed', { text }, {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 1000
  }
});

If the embed job fails after 3 attempts, the pipeline stops. Check the dead letter queue for failed jobs:

const dlq = await queue.getDlq(100);
for (const job of dlq) {
  console.log(`Failed: ${job.name} - ${job.error}`);
}

Monitoring Pipeline Progress

Track individual job status:

const embedJob = await queue.add('embed', { text });
const searchJob = await queue.add('search', { topK: 5 }, { dependsOn: [embedJob.id] });
const generateJob = await queue.add('generate', { model: 'gpt-4' }, { dependsOn: [searchJob.id] });

// Check status of each stage
const embedStatus = await queue.getJob(embedJob.id);
const searchStatus = await queue.getJob(searchJob.id);
const generateStatus = await queue.getJob(generateJob.id);

console.log('Embed:', embedStatus.status);     // completed
console.log('Search:', searchStatus.status);   // active
console.log('Generate:', generateStatus.status); // waiting

Complete Example

import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();
const queue = new Queue('ai-pipeline');

// Worker
new Worker('ai-pipeline', async (job) => {
  switch (job.name) {
    case 'embed':
      const embed = await openai.embeddings.create({
        input: job.data.text,
        model: 'text-embedding-3-small'
      });
      return { embedding: embed.data[0].embedding, text: job.data.text };

    case 'search':
      const results = await vectorDB.search({
        vector: job.parentResult.embedding,
        topK: job.data.topK
      });
      return { documents: results, query: job.parentResult.text };

    case 'generate':
      const context = job.parentResult.documents.map(d => d.content).join('\n\n');
      const response = await openai.chat.completions.create({
        model: 'gpt-4',
        messages: [
          { role: 'system', content: 'Answer based on context.' },
          { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` }
        ]
      });
      return response.choices[0].message.content;
  }
});

// Run pipeline
async function ask(question: string) {
  const embed = await queue.add('embed', { text: question });
  const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] });
  const generate = await queue.add('generate', {}, { dependsOn: [search.id] });

  return queue.finished(generate.id);
}

// Usage
const answer = await ask('What is the capital of France?');
console.log(answer);
Key Takeaways
  • Use dependsOn to chain jobs in a pipeline
  • Use queue.finished() to wait for the final result
  • Switch on job.name in the worker to handle different stages
  • Access parent results via job.parentResult

Build Your Pipeline

Start building multi-stage AI pipelines with flashQ.

See Docs →