Features Blog Docs GitHub Get Started

Monitoring AI Pipelines: Metrics, Alerts, and Dashboards

Running AI pipelines in production without proper monitoring is like flying blind. You need visibility into throughput, latency, error rates, and costs. In this guide, we'll set up comprehensive monitoring for flashQ using Prometheus and Grafana.

Why Monitor AI Pipelines?

AI workloads have unique monitoring challenges:

flashQ Metrics Overview

flashQ exposes Prometheus metrics at /metrics/prometheus when running with HTTP enabled:

# Start flashQ with HTTP API
HTTP=1 HTTP_PORT=6790 ./flashq-server

Available metrics include:

Metric Type Description
flashq_jobs_total Counter Total jobs by queue and status
flashq_jobs_active Gauge Currently processing jobs
flashq_jobs_waiting Gauge Jobs waiting in queue
flashq_jobs_failed Counter Failed jobs (sent to DLQ)
flashq_job_duration_seconds Histogram Job processing duration
flashq_queue_depth Gauge Queue depth by queue name

Setting Up Prometheus

Create a prometheus.yml configuration:

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'flashq'
    static_configs:
      - targets: ['localhost:6790']
    metrics_path: '/metrics/prometheus'

Run Prometheus with Docker:

docker run -d \
  --name prometheus \
  -p 9090:9090 \
  -v ./prometheus.yml:/etc/prometheus/prometheus.yml \
  prom/prometheus

Essential Grafana Dashboards

1. Queue Health Dashboard

Monitor the overall health of your queues:

# Jobs per second (throughput)
rate(flashq_jobs_total[5m])

# Queue depth (waiting jobs)
flashq_jobs_waiting

# Active jobs
flashq_jobs_active

# Error rate
rate(flashq_jobs_failed[5m]) / rate(flashq_jobs_total[5m]) * 100

2. Latency Dashboard

Track job processing times:

# P50 latency
histogram_quantile(0.5, rate(flashq_job_duration_seconds_bucket[5m]))

# P95 latency
histogram_quantile(0.95, rate(flashq_job_duration_seconds_bucket[5m]))

# P99 latency
histogram_quantile(0.99, rate(flashq_job_duration_seconds_bucket[5m]))

3. AI Cost Dashboard

Track API costs with custom metrics:

import { Counter, Histogram } from 'prom-client';

// Custom metrics for AI costs
const tokenCounter = new Counter({
  name: 'ai_tokens_total',
  help: 'Total tokens used',
  labelNames: ['model', 'type']  // type: input/output
});

const costCounter = new Counter({
  name: 'ai_cost_dollars',
  help: 'Total API cost in dollars',
  labelNames: ['model', 'queue']
});

// Track in worker
new Worker('llm-calls', async (job) => {
  const response = await openai.chat.completions.create(job.data);

  // Record metrics
  tokenCounter.inc(
    { model: job.data.model, type: 'input' },
    response.usage.prompt_tokens
  );
  tokenCounter.inc(
    { model: job.data.model, type: 'output' },
    response.usage.completion_tokens
  );

  const cost = calculateCost(job.data.model, response.usage);
  costCounter.inc({ model: job.data.model, queue: 'llm-calls' }, cost);

  return response;
});

Setting Up Alerts

Create alerting rules for critical conditions:

groups:
  - name: flashq_alerts
    rules:
      # High error rate
      - alert: HighErrorRate
        expr: rate(flashq_jobs_failed[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High job failure rate"

      # Queue backing up
      - alert: QueueBacklog
        expr: flashq_jobs_waiting > 10000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Queue has large backlog"

      # High latency
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(flashq_job_duration_seconds_bucket[5m])) > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P95 latency above 30 seconds"

      # Budget exceeded
      - alert: DailyBudgetExceeded
        expr: increase(ai_cost_dollars[24h]) > 100
        labels:
          severity: critical
        annotations:
          summary: "Daily AI spend exceeded $100"

Application-Level Monitoring

Beyond infrastructure metrics, monitor application behavior:

const queue = new Queue('ai-pipeline');

// Log all job completions
queue.on('completed', (job, result) => {
  console.log(JSON.stringify({
    event: 'job_completed',
    jobId: job.id,
    queue: 'ai-pipeline',
    duration: Date.now() - job.timestamp,
    resultSize: JSON.stringify(result).length
  }));
});

// Track failures with context
queue.on('failed', (job, error) => {
  console.error(JSON.stringify({
    event: 'job_failed',
    jobId: job.id,
    queue: 'ai-pipeline',
    error: error.message,
    attempt: job.attemptsMade,
    data: job.data
  }));
});

// Monitor progress for long jobs
queue.on('progress', (job, progress) => {
  console.log(JSON.stringify({
    event: 'job_progress',
    jobId: job.id,
    progress: progress.percent,
    message: progress.message
  }));
});

Real-Time Dashboard with WebSocket

flashQ supports WebSocket for real-time updates:

const ws = new WebSocket('ws://localhost:6790/ws?token=YOUR_TOKEN');

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);

  switch (data.type) {
    case 'job:completed':
      updateCompletedCount(data.queue);
      break;
    case 'job:failed':
      showErrorAlert(data.jobId, data.error);
      break;
    case 'queue:stats':
      updateDashboard(data.stats);
      break;
  }
};

Monitoring Checklist

Before going to production, ensure you have:

πŸ’‘ Pro Tip

Set up a daily report that summarizes jobs processed, errors, and total API spend. This helps catch trends before they become problems.

Conclusion

Good monitoring is the foundation of reliable AI systems. With flashQ's built-in Prometheus metrics and the patterns in this guide, you'll have full visibility into your AI pipelines. Start with the basics and iterate based on what you learn from your data.

Start Monitoring

Enable HTTP API and start collecting metrics today.

Read the Docs β†’