Retrieval-Augmented Generation (RAG) is the most powerful pattern for building AI chatbots that can answer questions about your own data. Instead of relying solely on the LLM's training data, RAG retrieves relevant context from your knowledge base before generating a response.
In this tutorial, we'll build a production-ready RAG chatbot using:
- flashQ - Job orchestration for the pipeline
- OpenAI - Embeddings and text generation
- Pinecone - Vector database for semantic search
RAG Architecture
A RAG system has two main flows:
INDEXING FLOW (one-time or periodic)
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
β Document βββββΆβ Chunk βββββΆβ Embed βββββΆβ Store β
β Upload β β Split β β (OpenAI)β β(Pinecone)β
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
QUERY FLOW (per user message)
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
β User βββββΆβ Embed βββββΆβ Search βββββΆβ Generate β
β Query β β Query β β Context β β Response β
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
flashQ orchestrates both flows, handling retries, rate limiting, and progress tracking.
Project Setup
# Initialize project
mkdir rag-chatbot && cd rag-chatbot
npm init -y
# Install dependencies
npm install flashq openai @pinecone-database/pinecone
npm install -D typescript @types/node tsx
// .env
FLASHQ_HOST=localhost
FLASHQ_PORT=6789
OPENAI_API_KEY=sk-...
PINECONE_API_KEY=...
PINECONE_INDEX=knowledge-base
Document Indexing Pipeline
// src/indexing.ts
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
import { Pinecone } from '@pinecone-database/pinecone';
const openai = new OpenAI();
const pinecone = new Pinecone();
const index = pinecone.index(process.env.PINECONE_INDEX!);
const queue = new Queue('rag-indexing');
// Rate limit to stay within OpenAI limits
await queue.setRateLimit(500); // 500 embeddings/minute
// Step 1: Chunk the document
async function chunkDocument(text: string, chunkSize = 500, overlap = 50) {
const chunks: string[] = [];
let start = 0;
while (start < text.length) {
const end = Math.min(start + chunkSize, text.length);
chunks.push(text.slice(start, end));
start += chunkSize - overlap;
}
return chunks;
}
// Main indexing function
export async function indexDocument(documentId: string, content: string, metadata: any) {
// Step 1: Chunk
const chunkJob = await queue.add('chunk', {
documentId,
content,
metadata,
});
// Wait for completion
const result = await queue.finished(chunkJob.id);
return result;
}
// Worker for chunking
new Worker('rag-indexing', async (job) => {
if (job.name === 'chunk') {
const { documentId, content, metadata } = job.data;
await job.updateProgress(10, 'Chunking document...');
const chunks = await chunkDocument(content);
// Create embedding jobs for each chunk
const embedJobs = await Promise.all(
chunks.map((chunk, i) =>
queue.add('embed', {
documentId,
chunkIndex: i,
text: chunk,
metadata: { ...metadata, chunkIndex: i },
})
)
);
// Create store job that depends on all embeddings
const storeJob = await queue.add('store', {
documentId,
totalChunks: chunks.length,
}, {
depends_on: embedJobs.map(j => j.id),
});
return { storeJobId: storeJob.id, totalChunks: chunks.length };
}
if (job.name === 'embed') {
const { text, documentId, chunkIndex, metadata } = job.data;
const response = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: text,
});
return {
id: `${documentId}-${chunkIndex}`,
embedding: response.data[0].embedding,
text,
metadata,
};
}
if (job.name === 'store') {
const { documentId, totalChunks } = job.data;
// Get all embedding results from parent jobs
const vectors = [];
for (let i = 0; i < totalChunks; i++) {
const result = await queue.getResult(`${documentId}-embed-${i}`);
vectors.push({
id: result.id,
values: result.embedding,
metadata: { ...result.metadata, text: result.text },
});
}
// Upsert to Pinecone in batches
const batchSize = 100;
for (let i = 0; i < vectors.length; i += batchSize) {
await index.upsert(vectors.slice(i, i + batchSize));
}
return { indexed: vectors.length, documentId };
}
});
Query Pipeline
// src/query.ts
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
import { Pinecone } from '@pinecone-database/pinecone';
const openai = new OpenAI();
const pinecone = new Pinecone();
const index = pinecone.index(process.env.PINECONE_INDEX!);
const queue = new Queue('rag-query');
// Main query function
export async function askQuestion(question: string, conversationHistory: any[] = []) {
// Step 1: Embed the question
const embedJob = await queue.add('embed-query', { question });
// Step 2: Search (depends on embed)
const searchJob = await queue.add('search', {
question,
topK: 5,
}, {
depends_on: [embedJob.id],
});
// Step 3: Generate response (depends on search)
const generateJob = await queue.add('generate', {
question,
conversationHistory,
}, {
depends_on: [searchJob.id],
});
// Wait for final result
return await queue.finished(generateJob.id);
}
// Query workers
new Worker('rag-query', async (job) => {
if (job.name === 'embed-query') {
const { question } = job.data;
const response = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: question,
});
return { embedding: response.data[0].embedding };
}
if (job.name === 'search') {
const { question, topK } = job.data;
// Get embedding from parent job
const embedResult = await queue.getResult(job.opts.depends_on[0]);
// Search Pinecone
const results = await index.query({
vector: embedResult.embedding,
topK,
includeMetadata: true,
});
const context = results.matches
.map(m => m.metadata?.text)
.filter(Boolean)
.join('\n\n---\n\n');
return { context, sources: results.matches };
}
if (job.name === 'generate') {
const { question, conversationHistory } = job.data;
// Get context from search job
const searchResult = await queue.getResult(job.opts.depends_on[0]);
const systemPrompt = `You are a helpful assistant. Answer questions based on the provided context.
If you cannot find the answer in the context, say "I don't have information about that."
Always cite which part of the context you used.
Context:
${searchResult.context}`;
const messages = [
{ role: 'system', content: systemPrompt },
...conversationHistory,
{ role: 'user', content: question },
];
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages,
temperature: 0.7,
max_tokens: 1000,
});
return {
answer: response.choices[0].message.content,
sources: searchResult.sources,
usage: response.usage,
};
}
}, {
concurrency: 10,
});
REST API
// src/server.ts
import express from 'express';
import { indexDocument } from './indexing';
import { askQuestion } from './query';
const app = express();
app.use(express.json({ limit: '10mb' }));
// Index a document
app.post('/api/index', async (req, res) => {
const { documentId, content, metadata } = req.body;
const result = await indexDocument(documentId, content, metadata);
res.json(result);
});
// Ask a question
app.post('/api/chat', async (req, res) => {
const { question, history } = req.body;
const result = await askQuestion(question, history || []);
res.json(result);
});
app.listen(3000, () => {
console.log('RAG Chatbot API running on port 3000');
});
Using the Chatbot
# Index a document
curl -X POST http://localhost:3000/api/index \
-H "Content-Type: application/json" \
-d '{
"documentId": "doc-1",
"content": "flashQ is a high-performance job queue...",
"metadata": { "source": "docs", "title": "flashQ Overview" }
}'
# Ask a question
curl -X POST http://localhost:3000/api/chat \
-H "Content-Type: application/json" \
-d '{
"question": "What is flashQ and how fast is it?"
}'
# Response
{
"answer": "flashQ is a high-performance job queue that can process up to 1.9 million jobs per second...",
"sources": [{ "id": "doc-1-0", "score": 0.92 }],
"usage": { "prompt_tokens": 450, "completion_tokens": 120 }
}
1. Use text-embedding-3-large for better accuracy. 2. Implement caching for frequent queries. 3. Add conversation memory with a sliding window. 4. Monitor token usage for cost control.
Conclusion
You've built a production-ready RAG chatbot with:
- Document indexing with chunking and embeddings
- Semantic search using Pinecone
- Context-aware generation with GPT-4
- Job orchestration with flashQ for reliability
flashQ handles the complex orchestrationβretries, rate limiting, progress trackingβso you can focus on building great AI experiences.
Build Your Own RAG Chatbot
Get started with flashQ and build intelligent AI applications.
Get Started β