FeaturesBlogDocs GitHub Get Started

Building a RAG Chatbot with flashQ and OpenAI

Retrieval-Augmented Generation (RAG) is the most powerful pattern for building AI chatbots that can answer questions about your own data. Instead of relying solely on the LLM's training data, RAG retrieves relevant context from your knowledge base before generating a response.

In this tutorial, we'll build a production-ready RAG chatbot using:

  • flashQ - Job orchestration for the pipeline
  • OpenAI - Embeddings and text generation
  • Pinecone - Vector database for semantic search

RAG Architecture

A RAG system has two main flows:

INDEXING FLOW (one-time or periodic)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Document │───▢│  Chunk   │───▢│  Embed   │───▢│  Store   β”‚
β”‚  Upload  β”‚    β”‚  Split   β”‚    β”‚  (OpenAI)β”‚    β”‚(Pinecone)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

QUERY FLOW (per user message)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   User   │───▢│  Embed   │───▢│  Search  │───▢│ Generate β”‚
β”‚  Query   β”‚    β”‚  Query   β”‚    β”‚  Context β”‚    β”‚ Response β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

flashQ orchestrates both flows, handling retries, rate limiting, and progress tracking.

Project Setup

# Initialize project
mkdir rag-chatbot && cd rag-chatbot
npm init -y

# Install dependencies
npm install flashq openai @pinecone-database/pinecone
npm install -D typescript @types/node tsx
// .env
FLASHQ_HOST=localhost
FLASHQ_PORT=6789

OPENAI_API_KEY=sk-...
PINECONE_API_KEY=...
PINECONE_INDEX=knowledge-base

Document Indexing Pipeline

// src/indexing.ts
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
import { Pinecone } from '@pinecone-database/pinecone';

const openai = new OpenAI();
const pinecone = new Pinecone();
const index = pinecone.index(process.env.PINECONE_INDEX!);

const queue = new Queue('rag-indexing');

// Rate limit to stay within OpenAI limits
await queue.setRateLimit(500); // 500 embeddings/minute

// Step 1: Chunk the document
async function chunkDocument(text: string, chunkSize = 500, overlap = 50) {
  const chunks: string[] = [];
  let start = 0;

  while (start < text.length) {
    const end = Math.min(start + chunkSize, text.length);
    chunks.push(text.slice(start, end));
    start += chunkSize - overlap;
  }

  return chunks;
}

// Main indexing function
export async function indexDocument(documentId: string, content: string, metadata: any) {
  // Step 1: Chunk
  const chunkJob = await queue.add('chunk', {
    documentId,
    content,
    metadata,
  });

  // Wait for completion
  const result = await queue.finished(chunkJob.id);
  return result;
}

// Worker for chunking
new Worker('rag-indexing', async (job) => {
  if (job.name === 'chunk') {
    const { documentId, content, metadata } = job.data;

    await job.updateProgress(10, 'Chunking document...');
    const chunks = await chunkDocument(content);

    // Create embedding jobs for each chunk
    const embedJobs = await Promise.all(
      chunks.map((chunk, i) =>
        queue.add('embed', {
          documentId,
          chunkIndex: i,
          text: chunk,
          metadata: { ...metadata, chunkIndex: i },
        })
      )
    );

    // Create store job that depends on all embeddings
    const storeJob = await queue.add('store', {
      documentId,
      totalChunks: chunks.length,
    }, {
      depends_on: embedJobs.map(j => j.id),
    });

    return { storeJobId: storeJob.id, totalChunks: chunks.length };
  }

  if (job.name === 'embed') {
    const { text, documentId, chunkIndex, metadata } = job.data;

    const response = await openai.embeddings.create({
      model: 'text-embedding-3-small',
      input: text,
    });

    return {
      id: `${documentId}-${chunkIndex}`,
      embedding: response.data[0].embedding,
      text,
      metadata,
    };
  }

  if (job.name === 'store') {
    const { documentId, totalChunks } = job.data;

    // Get all embedding results from parent jobs
    const vectors = [];
    for (let i = 0; i < totalChunks; i++) {
      const result = await queue.getResult(`${documentId}-embed-${i}`);
      vectors.push({
        id: result.id,
        values: result.embedding,
        metadata: { ...result.metadata, text: result.text },
      });
    }

    // Upsert to Pinecone in batches
    const batchSize = 100;
    for (let i = 0; i < vectors.length; i += batchSize) {
      await index.upsert(vectors.slice(i, i + batchSize));
    }

    return { indexed: vectors.length, documentId };
  }
});

Query Pipeline

// src/query.ts
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
import { Pinecone } from '@pinecone-database/pinecone';

const openai = new OpenAI();
const pinecone = new Pinecone();
const index = pinecone.index(process.env.PINECONE_INDEX!);

const queue = new Queue('rag-query');

// Main query function
export async function askQuestion(question: string, conversationHistory: any[] = []) {
  // Step 1: Embed the question
  const embedJob = await queue.add('embed-query', { question });

  // Step 2: Search (depends on embed)
  const searchJob = await queue.add('search', {
    question,
    topK: 5,
  }, {
    depends_on: [embedJob.id],
  });

  // Step 3: Generate response (depends on search)
  const generateJob = await queue.add('generate', {
    question,
    conversationHistory,
  }, {
    depends_on: [searchJob.id],
  });

  // Wait for final result
  return await queue.finished(generateJob.id);
}

// Query workers
new Worker('rag-query', async (job) => {
  if (job.name === 'embed-query') {
    const { question } = job.data;

    const response = await openai.embeddings.create({
      model: 'text-embedding-3-small',
      input: question,
    });

    return { embedding: response.data[0].embedding };
  }

  if (job.name === 'search') {
    const { question, topK } = job.data;

    // Get embedding from parent job
    const embedResult = await queue.getResult(job.opts.depends_on[0]);

    // Search Pinecone
    const results = await index.query({
      vector: embedResult.embedding,
      topK,
      includeMetadata: true,
    });

    const context = results.matches
      .map(m => m.metadata?.text)
      .filter(Boolean)
      .join('\n\n---\n\n');

    return { context, sources: results.matches };
  }

  if (job.name === 'generate') {
    const { question, conversationHistory } = job.data;

    // Get context from search job
    const searchResult = await queue.getResult(job.opts.depends_on[0]);

    const systemPrompt = `You are a helpful assistant. Answer questions based on the provided context.
If you cannot find the answer in the context, say "I don't have information about that."
Always cite which part of the context you used.

Context:
${searchResult.context}`;

    const messages = [
      { role: 'system', content: systemPrompt },
      ...conversationHistory,
      { role: 'user', content: question },
    ];

    const response = await openai.chat.completions.create({
      model: 'gpt-4',
      messages,
      temperature: 0.7,
      max_tokens: 1000,
    });

    return {
      answer: response.choices[0].message.content,
      sources: searchResult.sources,
      usage: response.usage,
    };
  }
}, {
  concurrency: 10,
});

REST API

// src/server.ts
import express from 'express';
import { indexDocument } from './indexing';
import { askQuestion } from './query';

const app = express();
app.use(express.json({ limit: '10mb' }));

// Index a document
app.post('/api/index', async (req, res) => {
  const { documentId, content, metadata } = req.body;

  const result = await indexDocument(documentId, content, metadata);
  res.json(result);
});

// Ask a question
app.post('/api/chat', async (req, res) => {
  const { question, history } = req.body;

  const result = await askQuestion(question, history || []);
  res.json(result);
});

app.listen(3000, () => {
  console.log('RAG Chatbot API running on port 3000');
});

Using the Chatbot

# Index a document
curl -X POST http://localhost:3000/api/index \
  -H "Content-Type: application/json" \
  -d '{
    "documentId": "doc-1",
    "content": "flashQ is a high-performance job queue...",
    "metadata": { "source": "docs", "title": "flashQ Overview" }
  }'

# Ask a question
curl -X POST http://localhost:3000/api/chat \
  -H "Content-Type: application/json" \
  -d '{
    "question": "What is flashQ and how fast is it?"
  }'

# Response
{
  "answer": "flashQ is a high-performance job queue that can process up to 1.9 million jobs per second...",
  "sources": [{ "id": "doc-1-0", "score": 0.92 }],
  "usage": { "prompt_tokens": 450, "completion_tokens": 120 }
}
πŸš€ Production Tips

1. Use text-embedding-3-large for better accuracy. 2. Implement caching for frequent queries. 3. Add conversation memory with a sliding window. 4. Monitor token usage for cost control.

Conclusion

You've built a production-ready RAG chatbot with:

  • Document indexing with chunking and embeddings
  • Semantic search using Pinecone
  • Context-aware generation with GPT-4
  • Job orchestration with flashQ for reliability

flashQ handles the complex orchestrationβ€”retries, rate limiting, progress trackingβ€”so you can focus on building great AI experiences.

Build Your Own RAG Chatbot

Get started with flashQ and build intelligent AI applications.

Get Started β†’
ESC