Processing thousands of documents for a vector database is a common task in AI applications. Whether you're building a RAG system, semantic search, or recommendation engine, you need to convert documents into embeddings efficiently.
The challenge? OpenAI's embedding API has rate limits (typically 3,000 requests per minute). Process documents sequentially and you'll hit limits. Process them all at once and you'll get 429 errors.
flashQ solves this with built-in rate limiting and bulk operations.
The Problem with Sequential Processing
Here's the naive approach:
// Don't do this - slow and no rate limiting
for (const doc of documents) {
const embedding = await openai.embeddings.create({
input: doc.text,
model: 'text-embedding-3-small'
});
await db.insert({ id: doc.id, vector: embedding });
}
Problems with this approach:
- Sequential processing is slow
- No retry on failure
- No progress tracking
- Will hit rate limits at scale
The flashQ Solution
flashQ handles rate limiting automatically. You queue your jobs and the worker respects API limits.
import { Queue, Worker } from 'flashq';
const queue = new Queue('batch-embeddings');
// Queue 10,000 documents with addBulk
const documents = await loadDocuments(); // Your 10K docs
const jobs = documents.map(doc => ({
name: 'embed',
data: { id: doc.id, text: doc.text }
}));
await queue.addBulk(jobs);
console.log(`Queued ${jobs.length} documents`);
Rate Limiting with Worker
The worker processes jobs while respecting OpenAI's rate limits:
const worker = new Worker('batch-embeddings', async (job) => {
const { id, text } = job.data;
// Call OpenAI
const response = await openai.embeddings.create({
input: text,
model: 'text-embedding-3-small'
});
// Store in vector database
await vectorDB.upsert({
id: id,
vector: response.data[0].embedding
});
// Update progress
await job.updateProgress(100);
return { id, dimensions: response.data[0].embedding.length };
}, {
limiter: {
max: 3000, // OpenAI's rate limit
duration: 60000 // per minute
}
});
The limiter option ensures the worker never exceeds 3,000 requests per minute, matching OpenAI's default rate limit.
Progress Tracking
Track progress across all 10K documents:
// Check queue status
const counts = await queue.getJobCounts();
console.log(counts);
// { waiting: 8500, active: 10, completed: 1490, failed: 0 }
// Calculate percentage
const total = counts.waiting + counts.active + counts.completed + counts.failed;
const progress = (counts.completed / total) * 100;
console.log(`Progress: ${progress.toFixed(1)}%`);
Handling Failures
Configure automatic retries for transient failures:
const jobs = documents.map(doc => ({
name: 'embed',
data: { id: doc.id, text: doc.text },
opts: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000
}
}
}));
await queue.addBulk(jobs);
If a job fails, it will retry with exponential backoff: 1s, 2s, 4s.
Monitoring the Batch
Monitor your batch with worker events:
let completed = 0;
const total = documents.length;
worker.on('completed', (job) => {
completed++;
if (completed % 100 === 0) {
console.log(`Processed ${completed}/${total} (${(completed/total*100).toFixed(1)}%)`);
}
});
worker.on('failed', (job, error) => {
console.error(`Failed: ${job.data.id} - ${error.message}`);
});
Performance Comparison
Processing 10,000 documents:
| Approach | Time | Errors |
|---|---|---|
| Sequential (no limit) | Rate limited after ~3K | Many 429s |
| Promise.all (parallel) | Immediate 429 errors | Fails completely |
| flashQ with limiter | ~4 minutes | 0 errors |
Complete Example
Here's a production-ready script:
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
const openai = new OpenAI();
const queue = new Queue('batch-embeddings');
// 1. Queue documents
async function queueDocuments(documents: { id: string; text: string }[]) {
const jobs = documents.map(doc => ({
name: 'embed',
data: doc,
opts: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }
}));
await queue.addBulk(jobs);
console.log(`Queued ${jobs.length} documents`);
}
// 2. Start worker
const worker = new Worker('batch-embeddings', async (job) => {
const embedding = await openai.embeddings.create({
input: job.data.text,
model: 'text-embedding-3-small'
});
await vectorDB.upsert({
id: job.data.id,
vector: embedding.data[0].embedding
});
await job.updateProgress(100);
}, {
limiter: { max: 3000, duration: 60000 }
});
// 3. Monitor progress
setInterval(async () => {
const counts = await queue.getJobCounts();
const done = counts.completed + counts.failed;
const total = done + counts.waiting + counts.active;
console.log(`Progress: ${done}/${total} (${counts.failed} failed)`);
}, 5000);
// Run
const docs = await loadDocuments();
await queueDocuments(docs);
- Use
addBulk()to queue thousands of jobs efficiently - Configure
limiterto respect API rate limits - Track progress with
getJobCounts() - Handle failures with automatic retries