Running AI pipelines in production without proper monitoring is like flying blind. You need visibility into throughput, latency, error rates, and costs. In this guide, we'll set up comprehensive monitoring for flashQ using Prometheus and Grafana.
Why Monitor AI Pipelines?
AI workloads have unique monitoring challenges:
- Variable latency: LLM calls can take 100ms to 60s depending on prompt length
- Cost visibility: API calls cost moneyβyou need to track spend in real-time
- Failure patterns: Rate limits, timeouts, and model errors need different handling
- Throughput planning: Understanding job flow helps capacity planning
flashQ Metrics Overview
flashQ exposes Prometheus metrics at /metrics/prometheus when running with HTTP enabled:
# Start flashQ with HTTP API
HTTP=1 HTTP_PORT=6790 ./flashq-server
Available metrics include:
| Metric | Type | Description |
|---|---|---|
flashq_jobs_total |
Counter | Total jobs by queue and status |
flashq_jobs_active |
Gauge | Currently processing jobs |
flashq_jobs_waiting |
Gauge | Jobs waiting in queue |
flashq_jobs_failed |
Counter | Failed jobs (sent to DLQ) |
flashq_job_duration_seconds |
Histogram | Job processing duration |
flashq_queue_depth |
Gauge | Queue depth by queue name |
Setting Up Prometheus
Create a prometheus.yml configuration:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'flashq'
static_configs:
- targets: ['localhost:6790']
metrics_path: '/metrics/prometheus'
Run Prometheus with Docker:
docker run -d \
--name prometheus \
-p 9090:9090 \
-v ./prometheus.yml:/etc/prometheus/prometheus.yml \
prom/prometheus
Essential Grafana Dashboards
1. Queue Health Dashboard
Monitor the overall health of your queues:
# Jobs per second (throughput)
rate(flashq_jobs_total[5m])
# Queue depth (waiting jobs)
flashq_jobs_waiting
# Active jobs
flashq_jobs_active
# Error rate
rate(flashq_jobs_failed[5m]) / rate(flashq_jobs_total[5m]) * 100
2. Latency Dashboard
Track job processing times:
# P50 latency
histogram_quantile(0.5, rate(flashq_job_duration_seconds_bucket[5m]))
# P95 latency
histogram_quantile(0.95, rate(flashq_job_duration_seconds_bucket[5m]))
# P99 latency
histogram_quantile(0.99, rate(flashq_job_duration_seconds_bucket[5m]))
3. AI Cost Dashboard
Track API costs with custom metrics:
import { Counter, Histogram } from 'prom-client';
// Custom metrics for AI costs
const tokenCounter = new Counter({
name: 'ai_tokens_total',
help: 'Total tokens used',
labelNames: ['model', 'type'] // type: input/output
});
const costCounter = new Counter({
name: 'ai_cost_dollars',
help: 'Total API cost in dollars',
labelNames: ['model', 'queue']
});
// Track in worker
new Worker('llm-calls', async (job) => {
const response = await openai.chat.completions.create(job.data);
// Record metrics
tokenCounter.inc(
{ model: job.data.model, type: 'input' },
response.usage.prompt_tokens
);
tokenCounter.inc(
{ model: job.data.model, type: 'output' },
response.usage.completion_tokens
);
const cost = calculateCost(job.data.model, response.usage);
costCounter.inc({ model: job.data.model, queue: 'llm-calls' }, cost);
return response;
});
Setting Up Alerts
Create alerting rules for critical conditions:
groups:
- name: flashq_alerts
rules:
# High error rate
- alert: HighErrorRate
expr: rate(flashq_jobs_failed[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High job failure rate"
# Queue backing up
- alert: QueueBacklog
expr: flashq_jobs_waiting > 10000
for: 10m
labels:
severity: warning
annotations:
summary: "Queue has large backlog"
# High latency
- alert: HighLatency
expr: histogram_quantile(0.95, rate(flashq_job_duration_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "P95 latency above 30 seconds"
# Budget exceeded
- alert: DailyBudgetExceeded
expr: increase(ai_cost_dollars[24h]) > 100
labels:
severity: critical
annotations:
summary: "Daily AI spend exceeded $100"
Application-Level Monitoring
Beyond infrastructure metrics, monitor application behavior:
const queue = new Queue('ai-pipeline');
// Log all job completions
queue.on('completed', (job, result) => {
console.log(JSON.stringify({
event: 'job_completed',
jobId: job.id,
queue: 'ai-pipeline',
duration: Date.now() - job.timestamp,
resultSize: JSON.stringify(result).length
}));
});
// Track failures with context
queue.on('failed', (job, error) => {
console.error(JSON.stringify({
event: 'job_failed',
jobId: job.id,
queue: 'ai-pipeline',
error: error.message,
attempt: job.attemptsMade,
data: job.data
}));
});
// Monitor progress for long jobs
queue.on('progress', (job, progress) => {
console.log(JSON.stringify({
event: 'job_progress',
jobId: job.id,
progress: progress.percent,
message: progress.message
}));
});
Real-Time Dashboard with WebSocket
flashQ supports WebSocket for real-time updates:
const ws = new WebSocket('ws://localhost:6790/ws?token=YOUR_TOKEN');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'job:completed':
updateCompletedCount(data.queue);
break;
case 'job:failed':
showErrorAlert(data.jobId, data.error);
break;
case 'queue:stats':
updateDashboard(data.stats);
break;
}
};
Monitoring Checklist
Before going to production, ensure you have:
- β Prometheus scraping flashQ metrics
- β Grafana dashboards for queues, latency, and costs
- β Alerts for high error rate, queue backlog, and budget
- β Structured logging for job events
- β DLQ monitoring and alerting
- β Cost tracking per model and queue
Set up a daily report that summarizes jobs processed, errors, and total API spend. This helps catch trends before they become problems.
Conclusion
Good monitoring is the foundation of reliable AI systems. With flashQ's built-in Prometheus metrics and the patterns in this guide, you'll have full visibility into your AI pipelines. Start with the basics and iterate based on what you learn from your data.