Testing AI pipelines presents unique challenges: non-deterministic outputs, expensive API calls, and complex dependencies. This guide covers strategies for testing AI workloads with flashQ, from unit tests to production monitoring.
AI Testing Challenges
- Non-deterministic outputs: Same input, different results
- Expensive API calls: $0.01-$0.10 per test run adds up
- Latency: AI calls take seconds, not milliseconds
- Rate limits: Provider throttling breaks CI
- Complex dependencies: Embeddings, vector DBs, LLMs
Project Setup
# Install test dependencies
npm install -D vitest @vitest/coverage-v8 msw testcontainers
// vitest.config.ts
import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
globals: true,
environment: 'node',
coverage: {
provider: 'v8',
reporter: ['text', 'json', 'html'],
exclude: ['**/mocks/**', '**/fixtures/**']
},
testTimeout: 30000, // AI tests need longer timeouts
hookTimeout: 30000
}
});
Mocking LLM Responses
Mock Service Worker (MSW)
// mocks/handlers.ts
import { http, HttpResponse } from 'msw';
export const handlers = [
// Mock OpenAI Chat Completions
http.post('https://api.openai.com/v1/chat/completions', async ({ request }) => {
const body = await request.json();
const messages = body.messages;
// Return deterministic responses based on input
const lastMessage = messages[messages.length - 1].content;
if (lastMessage.includes('summarize')) {
return HttpResponse.json({
choices: [{
message: {
role: 'assistant',
content: 'This is a mock summary of the document.'
}
}],
usage: { prompt_tokens: 100, completion_tokens: 50 }
});
}
if (lastMessage.includes('translate')) {
return HttpResponse.json({
choices: [{
message: {
role: 'assistant',
content: 'Traducción del texto al español.'
}
}],
usage: { prompt_tokens: 80, completion_tokens: 40 }
});
}
// Default response
return HttpResponse.json({
choices: [{
message: {
role: 'assistant',
content: 'Mock response for testing.'
}
}],
usage: { prompt_tokens: 50, completion_tokens: 25 }
});
}),
// Mock OpenAI Embeddings
http.post('https://api.openai.com/v1/embeddings', () => {
return HttpResponse.json({
data: [{ embedding: Array(1536).fill(0).map(() => Math.random()) }],
usage: { prompt_tokens: 10 }
});
})
];
// mocks/server.ts
import { setupServer } from 'msw/node';
import { handlers } from './handlers';
export const server = setupServer(...handlers);
// tests/setup.ts
import { server } from '../mocks/server';
beforeAll(() => server.listen({ onUnhandledRequest: 'error' }));
afterEach(() => server.resetHandlers());
afterAll(() => server.close());
Response Fixtures
// fixtures/llm-responses.ts
export const fixtures = {
summarization: {
short: {
input: 'Lorem ipsum dolor sit amet...',
expected: 'Brief overview of the Latin placeholder text.'
},
long: {
input: '[10000 word document]',
expected: 'Comprehensive summary covering main points...'
}
},
classification: {
spam: {
input: 'BUY NOW! Limited offer!',
expected: { label: 'spam', confidence: 0.95 }
},
notSpam: {
input: 'Meeting scheduled for tomorrow.',
expected: { label: 'not_spam', confidence: 0.92 }
}
},
extraction: {
contact: {
input: 'Contact John at john@example.com or 555-1234',
expected: {
name: 'John',
email: 'john@example.com',
phone: '555-1234'
}
}
}
};
Unit Testing Workers
// tests/workers/summarization.test.ts
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { summarizationProcessor } from '../../workers/summarization';
import { fixtures } from '../fixtures/llm-responses';
describe('Summarization Worker', () => {
const mockJob = (data: any) => ({
id: 'test-job-1',
data,
attempts: 1,
timestamp: Date.now()
});
it('should summarize short documents', async () => {
const job = mockJob({
document: fixtures.summarization.short.input,
maxLength: 100
});
const result = await summarizationProcessor(job);
expect(result).toBeDefined();
expect(result.summary).toBeTypeOf('string');
expect(result.summary.length).toBeLessThanOrEqual(500);
});
it('should handle empty documents', async () => {
const job = mockJob({ document: '' });
await expect(summarizationProcessor(job))
.rejects.toThrow('Document cannot be empty');
});
it('should respect token limits', async () => {
const job = mockJob({
document: 'A'.repeat(100000), // Very long
maxTokens: 1000
});
const result = await summarizationProcessor(job);
expect(result.truncated).toBe(true);
expect(result.tokensUsed).toBeLessThanOrEqual(1000);
});
it('should track token usage', async () => {
const job = mockJob({
document: fixtures.summarization.short.input
});
const result = await summarizationProcessor(job);
expect(result.usage).toBeDefined();
expect(result.usage.promptTokens).toBeGreaterThan(0);
expect(result.usage.completionTokens).toBeGreaterThan(0);
});
});
Integration Testing
Test Containers for flashQ
// tests/integration/setup.ts
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { FlashQ, Worker } from 'flashq';
let flashqContainer: StartedTestContainer;
let client: FlashQ;
export async function setupTestEnvironment() {
// Start flashQ container
flashqContainer = await new GenericContainer('flashq/flashq:latest')
.withExposedPorts(6789, 6790)
.withEnvironment({ HTTP: '1' })
.start();
const host = flashqContainer.getHost();
const port = flashqContainer.getMappedPort(6789);
client = new FlashQ({ host, port });
await client.connect();
return { client, host, port };
}
export async function teardownTestEnvironment() {
await client?.close();
await flashqContainer?.stop();
}
Full Pipeline Test
// tests/integration/ai-pipeline.test.ts
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { setupTestEnvironment, teardownTestEnvironment } from './setup';
import { FlashQ, Worker } from 'flashq';
describe('AI Pipeline Integration', () => {
let client: FlashQ;
let workers: Worker[] = [];
beforeAll(async () => {
const env = await setupTestEnvironment();
client = env.client;
// Start workers
workers.push(
new Worker(client, 'embedding', async (job) => {
// Mock embedding generation
return { embedding: Array(1536).fill(0.1) };
}),
new Worker(client, 'completion', async (job) => {
return { response: 'Test completion response' };
})
);
});
afterAll(async () => {
workers.forEach(w => w.close());
await teardownTestEnvironment();
});
it('should process RAG pipeline end-to-end', async () => {
// Step 1: Create embedding job
const embeddingJob = await client.push('embedding', {
text: 'What is flashQ?'
});
// Wait for embedding
const embeddingResult = await client.finished(embeddingJob.id, 10000);
expect(embeddingResult.embedding).toHaveLength(1536);
// Step 2: Create completion job with context
const completionJob = await client.push('completion', {
query: 'What is flashQ?',
context: ['flashQ is a high-performance job queue.']
});
const completionResult = await client.finished(completionJob.id, 10000);
expect(completionResult.response).toBeDefined();
});
it('should handle job failures gracefully', async () => {
// Create failing worker
const failingWorker = new Worker(client, 'failing-task', async () => {
throw new Error('Simulated API failure');
}, { concurrency: 1 });
const job = await client.push('failing-task', {}, {
max_attempts: 3,
backoff: 100
});
// Wait for all retries
await new Promise(resolve => setTimeout(resolve, 2000));
const state = await client.getState(job.id);
expect(state).toBe('failed');
const dlq = await client.getDlq('failing-task');
expect(dlq.some(j => j.id === job.id)).toBe(true);
failingWorker.close();
});
it('should process batch jobs efficiently', async () => {
const jobs = Array.from({ length: 100 }, (_, i) => ({
data: { text: `Document ${i}` }
}));
const pushed = await client.pushBatch('embedding', jobs);
expect(pushed).toHaveLength(100);
// Wait for all to complete
const results = await Promise.all(
pushed.map(j => client.finished(j.id, 30000))
);
expect(results.every(r => r.embedding)).toBe(true);
});
});
Output Quality Testing
// tests/quality/llm-output.test.ts
import { describe, it, expect } from 'vitest';
// Test output structure
describe('LLM Output Quality', () => {
it('should return valid JSON for structured output', async () => {
const result = await processStructuredTask({
prompt: 'Extract contact info',
input: 'Email: test@example.com',
schema: { email: 'string' }
});
expect(() => JSON.parse(result.output)).not.toThrow();
expect(result.output).toHaveProperty('email');
});
it('should not hallucinate information', async () => {
const input = 'The weather is sunny today.';
const result = await extractInfo(input);
// Should not invent details not in input
expect(result.temperature).toBeUndefined();
expect(result.location).toBeUndefined();
});
it('should respect length constraints', async () => {
const result = await generateSummary({
text: longDocument,
maxWords: 50
});
const wordCount = result.summary.split(/\s+/).length;
expect(wordCount).toBeLessThanOrEqual(60); // Allow 20% buffer
});
});
// Semantic similarity testing
import { cosineSimilarity } from '../utils/similarity';
describe('Semantic Consistency', () => {
it('should produce consistent embeddings', async () => {
const text = 'flashQ is a job queue';
const embedding1 = await getEmbedding(text);
const embedding2 = await getEmbedding(text);
// Same input should produce identical embeddings
expect(cosineSimilarity(embedding1, embedding2)).toBeCloseTo(1.0, 5);
});
it('should find semantically similar content', async () => {
const query = await getEmbedding('fast job processing');
const relevant = await getEmbedding('high-speed task execution');
const irrelevant = await getEmbedding('chocolate cake recipe');
expect(cosineSimilarity(query, relevant)).toBeGreaterThan(0.7);
expect(cosineSimilarity(query, irrelevant)).toBeLessThan(0.3);
});
});
Performance Testing
// tests/performance/throughput.test.ts
import { describe, it, expect } from 'vitest';
describe('Pipeline Performance', () => {
it('should meet throughput SLA', async () => {
const TARGET_THROUGHPUT = 100; // jobs/second
const TEST_DURATION = 10000; // 10 seconds
let completed = 0;
const start = Date.now();
while (Date.now() - start < TEST_DURATION) {
const jobs = Array.from({ length: 10 }, () => ({
data: { text: 'test' }
}));
const results = await client.pushBatch('embedding', jobs);
await Promise.all(results.map(j => client.finished(j.id, 5000)));
completed += 10;
}
const duration = (Date.now() - start) / 1000;
const throughput = completed / duration;
expect(throughput).toBeGreaterThanOrEqual(TARGET_THROUGHPUT);
});
it('should meet latency P95 SLA', async () => {
const TARGET_P95_MS = 500;
const latencies: number[] = [];
for (let i = 0; i < 100; i++) {
const start = Date.now();
const job = await client.push('embedding', { text: 'test' });
await client.finished(job.id, 5000);
latencies.push(Date.now() - start);
}
latencies.sort((a, b) => a - b);
const p95 = latencies[Math.floor(latencies.length * 0.95)];
expect(p95).toBeLessThanOrEqual(TARGET_P95_MS);
});
it('should handle concurrent load', async () => {
const CONCURRENT_JOBS = 50;
const jobs = await Promise.all(
Array.from({ length: CONCURRENT_JOBS }, () =>
client.push('embedding', { text: 'concurrent test' })
)
);
const results = await Promise.all(
jobs.map(j => client.finished(j.id, 10000))
);
expect(results.filter(r => r.embedding)).toHaveLength(CONCURRENT_JOBS);
});
});
CI/CD Integration
# .github/workflows/ai-tests.yml
name: AI Pipeline Tests
on:
push:
branches: [main]
pull_request:
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY_TEST }}
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Run unit tests with mocks
run: npm run test:unit
env:
USE_MOCKS: true
integration-tests:
runs-on: ubuntu-latest
services:
flashq:
image: flashq/flashq:latest
ports:
- 6789:6789
- 6790:6790
env:
HTTP: '1'
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Wait for flashQ
run: |
timeout 30 bash -c 'until curl -s http://localhost:6790/health; do sleep 1; done'
- name: Run integration tests
run: npm run test:integration
env:
FLASHQ_HOST: localhost
FLASHQ_PORT: 6789
quality-tests:
runs-on: ubuntu-latest
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install dependencies
run: npm ci
- name: Run quality tests (real API)
run: npm run test:quality
env:
USE_MOCKS: false
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY_TEST }}
Production Monitoring
// monitoring/ai-metrics.ts
import { flashq } from './client';
interface AIMetrics {
totalJobs: number;
successRate: number;
avgLatencyMs: number;
tokenUsage: number;
costEstimate: number;
errorsByType: Record<string, number>;
}
async function collectAIMetrics(): Promise<AIMetrics> {
const metrics = await flashq.metrics();
const aiQueues = Object.entries(metrics.queues)
.filter(([name]) => name.startsWith('ai:'));
let totalJobs = 0;
let completed = 0;
let totalLatency = 0;
for (const [, queue] of aiQueues) {
totalJobs += queue.completed + queue.failed;
completed += queue.completed;
totalLatency += queue.avgProcessingTime * queue.completed;
}
return {
totalJobs,
successRate: completed / totalJobs,
avgLatencyMs: totalLatency / completed,
tokenUsage: await getTokenUsage(),
costEstimate: await estimateCost(),
errorsByType: await getErrorBreakdown()
};
}
// Alerting on anomalies
async function checkHealthAndAlert() {
const metrics = await collectAIMetrics();
if (metrics.successRate < 0.95) {
await sendAlert({
severity: 'warning',
message: `AI success rate dropped to ${(metrics.successRate * 100).toFixed(1)}%`
});
}
if (metrics.avgLatencyMs > 5000) {
await sendAlert({
severity: 'warning',
message: `AI latency elevated: ${metrics.avgLatencyMs}ms`
});
}
if (metrics.costEstimate > DAILY_BUDGET) {
await sendAlert({
severity: 'critical',
message: `AI cost exceeding budget: $${metrics.costEstimate.toFixed(2)}`
});
}
}
✓ Mock LLM APIs for fast, deterministic tests
✓ Use test containers for integration tests
✓ Test output quality and structure
✓ Benchmark performance against SLAs
✓ Run quality tests on real APIs (nightly)
✓ Monitor production metrics continuously
Conclusion
Testing AI pipelines requires a multi-layered approach: fast unit tests with mocks, integration tests with real infrastructure, quality tests for output validation, and performance tests for SLA verification. With flashQ's testing-friendly architecture, you can build confidence in your AI systems.