FeaturesBlogDocs GitHub Get Started

Testing AI Pipelines with flashQ

Testing AI pipelines presents unique challenges: non-deterministic outputs, expensive API calls, and complex dependencies. This guide covers strategies for testing AI workloads with flashQ, from unit tests to production monitoring.

AI Testing Challenges

  • Non-deterministic outputs: Same input, different results
  • Expensive API calls: $0.01-$0.10 per test run adds up
  • Latency: AI calls take seconds, not milliseconds
  • Rate limits: Provider throttling breaks CI
  • Complex dependencies: Embeddings, vector DBs, LLMs

Project Setup

# Install test dependencies
npm install -D vitest @vitest/coverage-v8 msw testcontainers
// vitest.config.ts
import { defineConfig } from 'vitest/config';

export default defineConfig({
  test: {
    globals: true,
    environment: 'node',
    coverage: {
      provider: 'v8',
      reporter: ['text', 'json', 'html'],
      exclude: ['**/mocks/**', '**/fixtures/**']
    },
    testTimeout: 30000, // AI tests need longer timeouts
    hookTimeout: 30000
  }
});

Mocking LLM Responses

Mock Service Worker (MSW)

// mocks/handlers.ts
import { http, HttpResponse } from 'msw';

export const handlers = [
  // Mock OpenAI Chat Completions
  http.post('https://api.openai.com/v1/chat/completions', async ({ request }) => {
    const body = await request.json();
    const messages = body.messages;

    // Return deterministic responses based on input
    const lastMessage = messages[messages.length - 1].content;

    if (lastMessage.includes('summarize')) {
      return HttpResponse.json({
        choices: [{
          message: {
            role: 'assistant',
            content: 'This is a mock summary of the document.'
          }
        }],
        usage: { prompt_tokens: 100, completion_tokens: 50 }
      });
    }

    if (lastMessage.includes('translate')) {
      return HttpResponse.json({
        choices: [{
          message: {
            role: 'assistant',
            content: 'Traducción del texto al español.'
          }
        }],
        usage: { prompt_tokens: 80, completion_tokens: 40 }
      });
    }

    // Default response
    return HttpResponse.json({
      choices: [{
        message: {
          role: 'assistant',
          content: 'Mock response for testing.'
        }
      }],
      usage: { prompt_tokens: 50, completion_tokens: 25 }
    });
  }),

  // Mock OpenAI Embeddings
  http.post('https://api.openai.com/v1/embeddings', () => {
    return HttpResponse.json({
      data: [{ embedding: Array(1536).fill(0).map(() => Math.random()) }],
      usage: { prompt_tokens: 10 }
    });
  })
];

// mocks/server.ts
import { setupServer } from 'msw/node';
import { handlers } from './handlers';

export const server = setupServer(...handlers);

// tests/setup.ts
import { server } from '../mocks/server';

beforeAll(() => server.listen({ onUnhandledRequest: 'error' }));
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

Response Fixtures

// fixtures/llm-responses.ts
export const fixtures = {
  summarization: {
    short: {
      input: 'Lorem ipsum dolor sit amet...',
      expected: 'Brief overview of the Latin placeholder text.'
    },
    long: {
      input: '[10000 word document]',
      expected: 'Comprehensive summary covering main points...'
    }
  },

  classification: {
    spam: {
      input: 'BUY NOW! Limited offer!',
      expected: { label: 'spam', confidence: 0.95 }
    },
    notSpam: {
      input: 'Meeting scheduled for tomorrow.',
      expected: { label: 'not_spam', confidence: 0.92 }
    }
  },

  extraction: {
    contact: {
      input: 'Contact John at john@example.com or 555-1234',
      expected: {
        name: 'John',
        email: 'john@example.com',
        phone: '555-1234'
      }
    }
  }
};

Unit Testing Workers

// tests/workers/summarization.test.ts
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { summarizationProcessor } from '../../workers/summarization';
import { fixtures } from '../fixtures/llm-responses';

describe('Summarization Worker', () => {
  const mockJob = (data: any) => ({
    id: 'test-job-1',
    data,
    attempts: 1,
    timestamp: Date.now()
  });

  it('should summarize short documents', async () => {
    const job = mockJob({
      document: fixtures.summarization.short.input,
      maxLength: 100
    });

    const result = await summarizationProcessor(job);

    expect(result).toBeDefined();
    expect(result.summary).toBeTypeOf('string');
    expect(result.summary.length).toBeLessThanOrEqual(500);
  });

  it('should handle empty documents', async () => {
    const job = mockJob({ document: '' });

    await expect(summarizationProcessor(job))
      .rejects.toThrow('Document cannot be empty');
  });

  it('should respect token limits', async () => {
    const job = mockJob({
      document: 'A'.repeat(100000), // Very long
      maxTokens: 1000
    });

    const result = await summarizationProcessor(job);

    expect(result.truncated).toBe(true);
    expect(result.tokensUsed).toBeLessThanOrEqual(1000);
  });

  it('should track token usage', async () => {
    const job = mockJob({
      document: fixtures.summarization.short.input
    });

    const result = await summarizationProcessor(job);

    expect(result.usage).toBeDefined();
    expect(result.usage.promptTokens).toBeGreaterThan(0);
    expect(result.usage.completionTokens).toBeGreaterThan(0);
  });
});

Integration Testing

Test Containers for flashQ

// tests/integration/setup.ts
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { FlashQ, Worker } from 'flashq';

let flashqContainer: StartedTestContainer;
let client: FlashQ;

export async function setupTestEnvironment() {
  // Start flashQ container
  flashqContainer = await new GenericContainer('flashq/flashq:latest')
    .withExposedPorts(6789, 6790)
    .withEnvironment({ HTTP: '1' })
    .start();

  const host = flashqContainer.getHost();
  const port = flashqContainer.getMappedPort(6789);

  client = new FlashQ({ host, port });
  await client.connect();

  return { client, host, port };
}

export async function teardownTestEnvironment() {
  await client?.close();
  await flashqContainer?.stop();
}

Full Pipeline Test

// tests/integration/ai-pipeline.test.ts
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { setupTestEnvironment, teardownTestEnvironment } from './setup';
import { FlashQ, Worker } from 'flashq';

describe('AI Pipeline Integration', () => {
  let client: FlashQ;
  let workers: Worker[] = [];

  beforeAll(async () => {
    const env = await setupTestEnvironment();
    client = env.client;

    // Start workers
    workers.push(
      new Worker(client, 'embedding', async (job) => {
        // Mock embedding generation
        return { embedding: Array(1536).fill(0.1) };
      }),
      new Worker(client, 'completion', async (job) => {
        return { response: 'Test completion response' };
      })
    );
  });

  afterAll(async () => {
    workers.forEach(w => w.close());
    await teardownTestEnvironment();
  });

  it('should process RAG pipeline end-to-end', async () => {
    // Step 1: Create embedding job
    const embeddingJob = await client.push('embedding', {
      text: 'What is flashQ?'
    });

    // Wait for embedding
    const embeddingResult = await client.finished(embeddingJob.id, 10000);
    expect(embeddingResult.embedding).toHaveLength(1536);

    // Step 2: Create completion job with context
    const completionJob = await client.push('completion', {
      query: 'What is flashQ?',
      context: ['flashQ is a high-performance job queue.']
    });

    const completionResult = await client.finished(completionJob.id, 10000);
    expect(completionResult.response).toBeDefined();
  });

  it('should handle job failures gracefully', async () => {
    // Create failing worker
    const failingWorker = new Worker(client, 'failing-task', async () => {
      throw new Error('Simulated API failure');
    }, { concurrency: 1 });

    const job = await client.push('failing-task', {}, {
      max_attempts: 3,
      backoff: 100
    });

    // Wait for all retries
    await new Promise(resolve => setTimeout(resolve, 2000));

    const state = await client.getState(job.id);
    expect(state).toBe('failed');

    const dlq = await client.getDlq('failing-task');
    expect(dlq.some(j => j.id === job.id)).toBe(true);

    failingWorker.close();
  });

  it('should process batch jobs efficiently', async () => {
    const jobs = Array.from({ length: 100 }, (_, i) => ({
      data: { text: `Document ${i}` }
    }));

    const pushed = await client.pushBatch('embedding', jobs);
    expect(pushed).toHaveLength(100);

    // Wait for all to complete
    const results = await Promise.all(
      pushed.map(j => client.finished(j.id, 30000))
    );

    expect(results.every(r => r.embedding)).toBe(true);
  });
});

Output Quality Testing

// tests/quality/llm-output.test.ts
import { describe, it, expect } from 'vitest';

// Test output structure
describe('LLM Output Quality', () => {
  it('should return valid JSON for structured output', async () => {
    const result = await processStructuredTask({
      prompt: 'Extract contact info',
      input: 'Email: test@example.com',
      schema: { email: 'string' }
    });

    expect(() => JSON.parse(result.output)).not.toThrow();
    expect(result.output).toHaveProperty('email');
  });

  it('should not hallucinate information', async () => {
    const input = 'The weather is sunny today.';
    const result = await extractInfo(input);

    // Should not invent details not in input
    expect(result.temperature).toBeUndefined();
    expect(result.location).toBeUndefined();
  });

  it('should respect length constraints', async () => {
    const result = await generateSummary({
      text: longDocument,
      maxWords: 50
    });

    const wordCount = result.summary.split(/\s+/).length;
    expect(wordCount).toBeLessThanOrEqual(60); // Allow 20% buffer
  });
});

// Semantic similarity testing
import { cosineSimilarity } from '../utils/similarity';

describe('Semantic Consistency', () => {
  it('should produce consistent embeddings', async () => {
    const text = 'flashQ is a job queue';

    const embedding1 = await getEmbedding(text);
    const embedding2 = await getEmbedding(text);

    // Same input should produce identical embeddings
    expect(cosineSimilarity(embedding1, embedding2)).toBeCloseTo(1.0, 5);
  });

  it('should find semantically similar content', async () => {
    const query = await getEmbedding('fast job processing');
    const relevant = await getEmbedding('high-speed task execution');
    const irrelevant = await getEmbedding('chocolate cake recipe');

    expect(cosineSimilarity(query, relevant)).toBeGreaterThan(0.7);
    expect(cosineSimilarity(query, irrelevant)).toBeLessThan(0.3);
  });
});

Performance Testing

// tests/performance/throughput.test.ts
import { describe, it, expect } from 'vitest';

describe('Pipeline Performance', () => {
  it('should meet throughput SLA', async () => {
    const TARGET_THROUGHPUT = 100; // jobs/second
    const TEST_DURATION = 10000; // 10 seconds

    let completed = 0;
    const start = Date.now();

    while (Date.now() - start < TEST_DURATION) {
      const jobs = Array.from({ length: 10 }, () => ({
        data: { text: 'test' }
      }));

      const results = await client.pushBatch('embedding', jobs);
      await Promise.all(results.map(j => client.finished(j.id, 5000)));
      completed += 10;
    }

    const duration = (Date.now() - start) / 1000;
    const throughput = completed / duration;

    expect(throughput).toBeGreaterThanOrEqual(TARGET_THROUGHPUT);
  });

  it('should meet latency P95 SLA', async () => {
    const TARGET_P95_MS = 500;
    const latencies: number[] = [];

    for (let i = 0; i < 100; i++) {
      const start = Date.now();
      const job = await client.push('embedding', { text: 'test' });
      await client.finished(job.id, 5000);
      latencies.push(Date.now() - start);
    }

    latencies.sort((a, b) => a - b);
    const p95 = latencies[Math.floor(latencies.length * 0.95)];

    expect(p95).toBeLessThanOrEqual(TARGET_P95_MS);
  });

  it('should handle concurrent load', async () => {
    const CONCURRENT_JOBS = 50;

    const jobs = await Promise.all(
      Array.from({ length: CONCURRENT_JOBS }, () =>
        client.push('embedding', { text: 'concurrent test' })
      )
    );

    const results = await Promise.all(
      jobs.map(j => client.finished(j.id, 10000))
    );

    expect(results.filter(r => r.embedding)).toHaveLength(CONCURRENT_JOBS);
  });
});

CI/CD Integration

# .github/workflows/ai-tests.yml
name: AI Pipeline Tests

on:
  push:
    branches: [main]
  pull_request:

env:
  OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY_TEST }}

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Node.js
        uses: actions/setup-node@v4
        with:
          node-version: '20'
          cache: 'npm'

      - name: Install dependencies
        run: npm ci

      - name: Run unit tests with mocks
        run: npm run test:unit
        env:
          USE_MOCKS: true

  integration-tests:
    runs-on: ubuntu-latest
    services:
      flashq:
        image: flashq/flashq:latest
        ports:
          - 6789:6789
          - 6790:6790
        env:
          HTTP: '1'

    steps:
      - uses: actions/checkout@v4

      - name: Setup Node.js
        uses: actions/setup-node@v4
        with:
          node-version: '20'
          cache: 'npm'

      - name: Install dependencies
        run: npm ci

      - name: Wait for flashQ
        run: |
          timeout 30 bash -c 'until curl -s http://localhost:6790/health; do sleep 1; done'

      - name: Run integration tests
        run: npm run test:integration
        env:
          FLASHQ_HOST: localhost
          FLASHQ_PORT: 6789

  quality-tests:
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Setup Node.js
        uses: actions/setup-node@v4
        with:
          node-version: '20'

      - name: Install dependencies
        run: npm ci

      - name: Run quality tests (real API)
        run: npm run test:quality
        env:
          USE_MOCKS: false
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY_TEST }}

Production Monitoring

// monitoring/ai-metrics.ts
import { flashq } from './client';

interface AIMetrics {
  totalJobs: number;
  successRate: number;
  avgLatencyMs: number;
  tokenUsage: number;
  costEstimate: number;
  errorsByType: Record<string, number>;
}

async function collectAIMetrics(): Promise<AIMetrics> {
  const metrics = await flashq.metrics();
  const aiQueues = Object.entries(metrics.queues)
    .filter(([name]) => name.startsWith('ai:'));

  let totalJobs = 0;
  let completed = 0;
  let totalLatency = 0;

  for (const [, queue] of aiQueues) {
    totalJobs += queue.completed + queue.failed;
    completed += queue.completed;
    totalLatency += queue.avgProcessingTime * queue.completed;
  }

  return {
    totalJobs,
    successRate: completed / totalJobs,
    avgLatencyMs: totalLatency / completed,
    tokenUsage: await getTokenUsage(),
    costEstimate: await estimateCost(),
    errorsByType: await getErrorBreakdown()
  };
}

// Alerting on anomalies
async function checkHealthAndAlert() {
  const metrics = await collectAIMetrics();

  if (metrics.successRate < 0.95) {
    await sendAlert({
      severity: 'warning',
      message: `AI success rate dropped to ${(metrics.successRate * 100).toFixed(1)}%`
    });
  }

  if (metrics.avgLatencyMs > 5000) {
    await sendAlert({
      severity: 'warning',
      message: `AI latency elevated: ${metrics.avgLatencyMs}ms`
    });
  }

  if (metrics.costEstimate > DAILY_BUDGET) {
    await sendAlert({
      severity: 'critical',
      message: `AI cost exceeding budget: $${metrics.costEstimate.toFixed(2)}`
    });
  }
}
Testing Checklist

✓ Mock LLM APIs for fast, deterministic tests
✓ Use test containers for integration tests
✓ Test output quality and structure
✓ Benchmark performance against SLAs
✓ Run quality tests on real APIs (nightly)
✓ Monitor production metrics continuously

Conclusion

Testing AI pipelines requires a multi-layered approach: fast unit tests with mocks, integration tests with real infrastructure, quality tests for output validation, and performance tests for SLA verification. With flashQ's testing-friendly architecture, you can build confidence in your AI systems.

Start Testing Today

Build reliable AI pipelines with comprehensive testing.

Get Started →
ESC