Modern AI applications rarely call a single LLM. They orchestrate multiple steps: embed a query, search a knowledge base, format context, and generate a response. Each step depends on the previous one.
flashQ's job dependencies let you model these pipelines naturally. Jobs wait for their dependencies to complete before executing.
Pipeline Overview
A typical RAG pipeline has three stages:
┌─────────┐ ┌─────────┐ ┌──────────┐
│ Embed │ ──▶ │ Search │ ──▶ │ Generate │
└─────────┘ └─────────┘ └──────────┘
- Embed: Convert the user query into a vector
- Search: Find relevant documents in the vector database
- Generate: Use the documents as context for the LLM
Creating the Pipeline
Use dependsOn to chain jobs together:
import { Queue } from 'flashq';
const queue = new Queue('ai-pipeline');
async function runRAGPipeline(question: string) {
// Stage 1: Embed the query
const embedJob = await queue.add('embed', {
text: question
});
// Stage 2: Search (waits for embedding)
const searchJob = await queue.add('search', {
topK: 5
}, {
dependsOn: [embedJob.id]
});
// Stage 3: Generate (waits for search)
const generateJob = await queue.add('generate', {
model: 'gpt-4',
systemPrompt: 'Answer based on the provided context.'
}, {
dependsOn: [searchJob.id]
});
// Wait for the final result
const response = await queue.finished(generateJob.id);
return response;
}
When you call queue.finished(), flashQ waits for the entire pipeline to complete and returns the final result.
Setting Up the Worker
A single worker handles all job types using a switch on job.name:
import { Worker } from 'flashq';
import OpenAI from 'openai';
const openai = new OpenAI();
new Worker('ai-pipeline', async (job) => {
switch (job.name) {
case 'embed':
return handleEmbed(job);
case 'search':
return handleSearch(job);
case 'generate':
return handleGenerate(job);
}
});
Embed Handler
async function handleEmbed(job) {
const { text } = job.data;
const response = await openai.embeddings.create({
input: text,
model: 'text-embedding-3-small'
});
return {
embedding: response.data[0].embedding,
originalText: text
};
}
Search Handler
async function handleSearch(job) {
// Get result from parent job (embed)
const parentResult = job.parentResult;
const results = await vectorDB.search({
vector: parentResult.embedding,
topK: job.data.topK
});
return {
documents: results,
query: parentResult.originalText
};
}
Generate Handler
async function handleGenerate(job) {
const parentResult = job.parentResult;
const context = parentResult.documents
.map(doc => doc.content)
.join('\n\n');
const response = await openai.chat.completions.create({
model: job.data.model,
messages: [
{ role: 'system', content: job.data.systemPrompt },
{
role: 'user',
content: `Context:\n${context}\n\nQuestion: ${parentResult.query}`
}
]
});
return response.choices[0].message.content;
}
Use Case: Document Processing
Another common pipeline is document ingestion:
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│ Parse │ ──▶ │ Chunk │ ──▶ │ Embed │ ──▶ │ Store │
└───────┘ └───────┘ └───────┘ └───────┘
async function processDocument(url: string) {
// Parse the document
const parseJob = await queue.add('parse', { url });
// Chunk into smaller pieces
const chunkJob = await queue.add('chunk', {
chunkSize: 500,
overlap: 50
}, {
dependsOn: [parseJob.id]
});
// Embed all chunks
const embedJob = await queue.add('embed-chunks', {}, {
dependsOn: [chunkJob.id]
});
// Store in vector database
const storeJob = await queue.add('store', { url }, {
dependsOn: [embedJob.id]
});
return queue.finished(storeJob.id);
}
Use Case: Content Generation
Generate and refine content with multiple LLM calls:
async function generateBlogPost(topic: string) {
// Generate outline
const outlineJob = await queue.add('generate-outline', {
topic,
model: 'gpt-4'
});
// Generate draft based on outline
const draftJob = await queue.add('generate-draft', {
model: 'gpt-4'
}, {
dependsOn: [outlineJob.id]
});
// Edit and improve the draft
const editJob = await queue.add('edit-draft', {
model: 'gpt-4'
}, {
dependsOn: [draftJob.id]
});
return queue.finished(editJob.id);
}
Error Handling
When a job fails, dependent jobs won't execute. Configure retries:
const embedJob = await queue.add('embed', { text }, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000
}
});
If the embed job fails after 3 attempts, the pipeline stops. Check the dead letter queue for failed jobs:
const dlq = await queue.getDlq(100);
for (const job of dlq) {
console.log(`Failed: ${job.name} - ${job.error}`);
}
Monitoring Pipeline Progress
Track individual job status:
const embedJob = await queue.add('embed', { text });
const searchJob = await queue.add('search', { topK: 5 }, { dependsOn: [embedJob.id] });
const generateJob = await queue.add('generate', { model: 'gpt-4' }, { dependsOn: [searchJob.id] });
// Check status of each stage
const embedStatus = await queue.getJob(embedJob.id);
const searchStatus = await queue.getJob(searchJob.id);
const generateStatus = await queue.getJob(generateJob.id);
console.log('Embed:', embedStatus.status); // completed
console.log('Search:', searchStatus.status); // active
console.log('Generate:', generateStatus.status); // waiting
Complete Example
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
const openai = new OpenAI();
const queue = new Queue('ai-pipeline');
// Worker
new Worker('ai-pipeline', async (job) => {
switch (job.name) {
case 'embed':
const embed = await openai.embeddings.create({
input: job.data.text,
model: 'text-embedding-3-small'
});
return { embedding: embed.data[0].embedding, text: job.data.text };
case 'search':
const results = await vectorDB.search({
vector: job.parentResult.embedding,
topK: job.data.topK
});
return { documents: results, query: job.parentResult.text };
case 'generate':
const context = job.parentResult.documents.map(d => d.content).join('\n\n');
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: 'Answer based on context.' },
{ role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` }
]
});
return response.choices[0].message.content;
}
});
// Run pipeline
async function ask(question: string) {
const embed = await queue.add('embed', { text: question });
const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] });
const generate = await queue.add('generate', {}, { dependsOn: [search.id] });
return queue.finished(generate.id);
}
// Usage
const answer = await ask('What is the capital of France?');
console.log(answer);
- Use
dependsOnto chain jobs in a pipeline - Use
queue.finished()to wait for the final result - Switch on
job.namein the worker to handle different stages - Access parent results via
job.parentResult