Features Blog Docs GitHub Get Started

Batch Embeddings at Scale: Processing 10K Documents

Processing thousands of documents for a vector database is a common task in AI applications. Whether you're building a RAG system, semantic search, or recommendation engine, you need to convert documents into embeddings efficiently.

The challenge? OpenAI's embedding API has rate limits (typically 3,000 requests per minute). Process documents sequentially and you'll hit limits. Process them all at once and you'll get 429 errors.

flashQ solves this with built-in rate limiting and bulk operations.

The Problem with Sequential Processing

Here's the naive approach:

// Don't do this - slow and no rate limiting
for (const doc of documents) {
  const embedding = await openai.embeddings.create({
    input: doc.text,
    model: 'text-embedding-3-small'
  });
  await db.insert({ id: doc.id, vector: embedding });
}

Problems with this approach:

  • Sequential processing is slow
  • No retry on failure
  • No progress tracking
  • Will hit rate limits at scale

The flashQ Solution

flashQ handles rate limiting automatically. You queue your jobs and the worker respects API limits.

import { Queue, Worker } from 'flashq';

const queue = new Queue('batch-embeddings');

// Queue 10,000 documents with addBulk
const documents = await loadDocuments(); // Your 10K docs

const jobs = documents.map(doc => ({
  name: 'embed',
  data: { id: doc.id, text: doc.text }
}));

await queue.addBulk(jobs);
console.log(`Queued ${jobs.length} documents`);

Rate Limiting with Worker

The worker processes jobs while respecting OpenAI's rate limits:

const worker = new Worker('batch-embeddings', async (job) => {
  const { id, text } = job.data;

  // Call OpenAI
  const response = await openai.embeddings.create({
    input: text,
    model: 'text-embedding-3-small'
  });

  // Store in vector database
  await vectorDB.upsert({
    id: id,
    vector: response.data[0].embedding
  });

  // Update progress
  await job.updateProgress(100);

  return { id, dimensions: response.data[0].embedding.length };
}, {
  limiter: {
    max: 3000,      // OpenAI's rate limit
    duration: 60000 // per minute
  }
});

The limiter option ensures the worker never exceeds 3,000 requests per minute, matching OpenAI's default rate limit.

Progress Tracking

Track progress across all 10K documents:

// Check queue status
const counts = await queue.getJobCounts();
console.log(counts);
// { waiting: 8500, active: 10, completed: 1490, failed: 0 }

// Calculate percentage
const total = counts.waiting + counts.active + counts.completed + counts.failed;
const progress = (counts.completed / total) * 100;
console.log(`Progress: ${progress.toFixed(1)}%`);

Handling Failures

Configure automatic retries for transient failures:

const jobs = documents.map(doc => ({
  name: 'embed',
  data: { id: doc.id, text: doc.text },
  opts: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000
    }
  }
}));

await queue.addBulk(jobs);

If a job fails, it will retry with exponential backoff: 1s, 2s, 4s.

Monitoring the Batch

Monitor your batch with worker events:

let completed = 0;
const total = documents.length;

worker.on('completed', (job) => {
  completed++;
  if (completed % 100 === 0) {
    console.log(`Processed ${completed}/${total} (${(completed/total*100).toFixed(1)}%)`);
  }
});

worker.on('failed', (job, error) => {
  console.error(`Failed: ${job.data.id} - ${error.message}`);
});

Performance Comparison

Processing 10,000 documents:

Approach Time Errors
Sequential (no limit) Rate limited after ~3K Many 429s
Promise.all (parallel) Immediate 429 errors Fails completely
flashQ with limiter ~4 minutes 0 errors

Complete Example

Here's a production-ready script:

import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();
const queue = new Queue('batch-embeddings');

// 1. Queue documents
async function queueDocuments(documents: { id: string; text: string }[]) {
  const jobs = documents.map(doc => ({
    name: 'embed',
    data: doc,
    opts: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }
  }));

  await queue.addBulk(jobs);
  console.log(`Queued ${jobs.length} documents`);
}

// 2. Start worker
const worker = new Worker('batch-embeddings', async (job) => {
  const embedding = await openai.embeddings.create({
    input: job.data.text,
    model: 'text-embedding-3-small'
  });

  await vectorDB.upsert({
    id: job.data.id,
    vector: embedding.data[0].embedding
  });

  await job.updateProgress(100);
}, {
  limiter: { max: 3000, duration: 60000 }
});

// 3. Monitor progress
setInterval(async () => {
  const counts = await queue.getJobCounts();
  const done = counts.completed + counts.failed;
  const total = done + counts.waiting + counts.active;
  console.log(`Progress: ${done}/${total} (${counts.failed} failed)`);
}, 5000);

// Run
const docs = await loadDocuments();
await queueDocuments(docs);
Key Takeaways
  • Use addBulk() to queue thousands of jobs efficiently
  • Configure limiter to respect API rate limits
  • Track progress with getJobCounts()
  • Handle failures with automatic retries

Start Processing

Get flashQ running and start processing your documents.

See Docs →