Security is critical for any job queue system handling sensitive data. This guide covers authentication, encryption, input validation, network security, and compliance best practices for flashQ deployments.
Never deploy flashQ to production without enabling authentication and reviewing these security practices. An unsecured queue can expose sensitive data and allow arbitrary code execution.
Authentication
Token-Based Authentication
flashQ uses bearer token authentication. Configure tokens via environment variables.
# Server configuration
AUTH_TOKENS=token1,token2,token3
# Or a single secure token
AUTH_TOKENS=$(openssl rand -hex 32)
// Client authentication
import { FlashQ } from 'flashq';
const client = new FlashQ({
host: 'flashq.internal',
port: 6789,
token: process.env.FLASHQ_TOKEN // Never hardcode!
});
await client.connect();
Token Rotation
// Implement token rotation without downtime
// 1. Add new token while keeping old ones
AUTH_TOKENS=old_token,new_token
// 2. Update all clients to use new token
const client = new FlashQ({
token: process.env.FLASHQ_TOKEN_NEW
});
// 3. After all clients updated, remove old token
AUTH_TOKENS=new_token
Per-Queue Authorization
// Implement role-based queue access
interface TokenPermissions {
token: string;
queues: string[]; // Allowed queue patterns
operations: string[]; // push, pull, admin, etc.
}
const permissions: TokenPermissions[] = [
{
token: 'worker-token-xxx',
queues: ['jobs:*', 'tasks:*'],
operations: ['pull', 'ack', 'fail', 'progress']
},
{
token: 'producer-token-xxx',
queues: ['jobs:*'],
operations: ['push', 'getJob', 'cancel']
},
{
token: 'admin-token-xxx',
queues: ['*'],
operations: ['*']
}
];
// Middleware to check permissions
function checkPermission(token: string, queue: string, op: string): boolean {
const perm = permissions.find(p => p.token === token);
if (!perm) return false;
const queueAllowed = perm.queues.some(pattern =>
matchPattern(pattern, queue)
);
const opAllowed = perm.operations.includes('*') ||
perm.operations.includes(op);
return queueAllowed && opAllowed;
}
Encryption
TLS/SSL for Transport
# Enable TLS with nginx reverse proxy
# nginx.conf
server {
listen 443 ssl;
server_name flashq.example.com;
ssl_certificate /etc/ssl/flashq.crt;
ssl_certificate_key /etc/ssl/flashq.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
location / {
proxy_pass http://flashq:6790;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
# TCP passthrough for binary protocol
stream {
server {
listen 6789 ssl;
proxy_pass flashq:6789;
ssl_certificate /etc/ssl/flashq.crt;
ssl_certificate_key /etc/ssl/flashq.key;
}
}
Encrypting Job Data
// encryption/job-encryption.ts
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';
const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex'); // 32 bytes
interface EncryptedData {
iv: string;
tag: string;
data: string;
}
function encryptJobData(data: any): EncryptedData {
const iv = randomBytes(16);
const cipher = createCipheriv(ALGORITHM, KEY, iv);
const plaintext = JSON.stringify(data);
let encrypted = cipher.update(plaintext, 'utf8', 'base64');
encrypted += cipher.final('base64');
return {
iv: iv.toString('base64'),
tag: cipher.getAuthTag().toString('base64'),
data: encrypted
};
}
function decryptJobData(encrypted: EncryptedData): any {
const iv = Buffer.from(encrypted.iv, 'base64');
const tag = Buffer.from(encrypted.tag, 'base64');
const decipher = createDecipheriv(ALGORITHM, KEY, iv);
decipher.setAuthTag(tag);
let decrypted = decipher.update(encrypted.data, 'base64', 'utf8');
decrypted += decipher.final('utf8');
return JSON.parse(decrypted);
}
// Usage with flashQ
async function pushEncryptedJob(queue: string, data: any) {
const encrypted = encryptJobData(data);
return flashq.push(queue, { encrypted });
}
// Worker decryption
const worker = new Worker(flashq, 'secure-jobs', async (job) => {
const data = decryptJobData(job.data.encrypted);
// Process decrypted data
return processJob(data);
});
Field-Level Encryption
// Encrypt only sensitive fields
const SENSITIVE_FIELDS = ['ssn', 'creditCard', 'password', 'apiKey'];
function encryptSensitiveFields(data: Record<string, any>): Record<string, any> {
const result = { ...data };
for (const field of SENSITIVE_FIELDS) {
if (field in result) {
result[`_encrypted_${field}`] = encryptJobData(result[field]);
delete result[field];
}
}
return result;
}
function decryptSensitiveFields(data: Record<string, any>): Record<string, any> {
const result = { ...data };
for (const key of Object.keys(result)) {
if (key.startsWith('_encrypted_')) {
const field = key.replace('_encrypted_', '');
result[field] = decryptJobData(result[key]);
delete result[key];
}
}
return result;
}
Input Validation
Schema Validation with Zod
// validation/job-schemas.ts
import { z } from 'zod';
// Define strict schemas for each job type
const EmailJobSchema = z.object({
to: z.string().email().max(254),
subject: z.string().max(998), // RFC 5322 limit
body: z.string().max(1048576), // 1MB max
attachments: z.array(z.object({
filename: z.string().max(255).regex(/^[a-zA-Z0-9._-]+$/),
contentType: z.string().max(100),
size: z.number().max(10485760) // 10MB max
})).max(10).optional()
});
const PaymentJobSchema = z.object({
orderId: z.string().uuid(),
amount: z.number().positive().max(999999.99),
currency: z.enum(['USD', 'EUR', 'GBP']),
customerId: z.string().uuid()
});
const ImageProcessingSchema = z.object({
imageUrl: z.string().url().startsWith('https://'),
operations: z.array(z.enum([
'resize', 'crop', 'rotate', 'compress'
])).max(10),
outputFormat: z.enum(['jpeg', 'png', 'webp'])
});
// Validate before pushing
async function pushValidatedJob<T>(
queue: string,
schema: z.Schema<T>,
data: unknown
) {
const validated = schema.parse(data);
return flashq.push(queue, validated);
}
// Usage
await pushValidatedJob('emails', EmailJobSchema, {
to: 'user@example.com',
subject: 'Hello',
body: 'Content here'
});
Sanitization
// validation/sanitize.ts
import DOMPurify from 'isomorphic-dompurify';
// Sanitize HTML content
function sanitizeHtml(html: string): string {
return DOMPurify.sanitize(html, {
ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'p', 'br'],
ALLOWED_ATTR: []
});
}
// Remove potential injection characters
function sanitizeString(str: string): string {
return str
.replace(/[<>'"&]/g, '') // Remove dangerous chars
.trim()
.slice(0, 10000); // Limit length
}
// Sanitize file paths (prevent path traversal)
function sanitizePath(path: string): string {
return path
.replace(/\.\./g, '') // Remove parent directory refs
.replace(/^\//, '') // Remove leading slash
.replace(/[^a-zA-Z0-9._/-]/g, ''); // Whitelist chars
}
// Sanitize SQL (for dynamic queries, prefer parameterized)
function sanitizeSqlIdentifier(identifier: string): string {
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(identifier)) {
throw new Error('Invalid SQL identifier');
}
return identifier;
}
Rate Limiting
Queue-Level Rate Limits
// Set rate limits per queue
await flashq.setRateLimit('api-calls', {
max: 100, // 100 jobs
duration: 60000 // per minute
});
// Different limits for different queues
const rateLimits = {
'emails': { max: 1000, duration: 3600000 }, // 1000/hour
'payments': { max: 100, duration: 60000 }, // 100/minute
'notifications': { max: 10000, duration: 60000 } // 10000/minute
};
for (const [queue, limit] of Object.entries(rateLimits)) {
await flashq.setRateLimit(queue, limit);
}
Client-Level Rate Limiting
// Implement client-side rate limiting
import { RateLimiter } from 'limiter';
class RateLimitedClient {
private client: FlashQ;
private limiters: Map<string, RateLimiter> = new Map();
constructor(client: FlashQ) {
this.client = client;
}
private getLimiter(queue: string): RateLimiter {
if (!this.limiters.has(queue)) {
this.limiters.set(queue, new RateLimiter({
tokensPerInterval: 100,
interval: 'minute'
}));
}
return this.limiters.get(queue)!;
}
async push(queue: string, data: any, options?: any) {
const limiter = this.getLimiter(queue);
if (!limiter.tryRemoveTokens(1)) {
throw new Error(`Rate limit exceeded for queue: ${queue}`);
}
return this.client.push(queue, data, options);
}
}
Network Security
Firewall Configuration
# Only allow internal network access to flashQ
# iptables rules
iptables -A INPUT -p tcp --dport 6789 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 6789 -j DROP
iptables -A INPUT -p tcp --dport 6790 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 6790 -j DROP
Kubernetes Network Policy
# k8s/network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: flashq-network-policy
namespace: flashq
spec:
podSelector:
matchLabels:
app: flashq
policyTypes:
- Ingress
- Egress
ingress:
# Only allow from workers namespace
- from:
- namespaceSelector:
matchLabels:
name: workers
- podSelector:
matchLabels:
role: worker
ports:
- port: 6789
- port: 6790
# Allow from monitoring
- from:
- namespaceSelector:
matchLabels:
name: monitoring
ports:
- port: 6790 # Metrics only
egress:
# Only allow PostgreSQL
- to:
- namespaceSelector:
matchLabels:
name: databases
ports:
- port: 5432
Private VPC Deployment
# terraform/vpc.tf
resource "aws_vpc" "flashq" {
cidr_block = "10.0.0.0/16"
enable_dns_hostnames = true
tags = {
Name = "flashq-vpc"
}
}
resource "aws_subnet" "private" {
vpc_id = aws_vpc.flashq.id
cidr_block = "10.0.1.0/24"
availability_zone = "us-east-1a"
tags = {
Name = "flashq-private"
}
}
# No internet gateway - truly private
resource "aws_security_group" "flashq" {
vpc_id = aws_vpc.flashq.id
ingress {
from_port = 6789
to_port = 6790
protocol = "tcp"
cidr_blocks = ["10.0.0.0/16"]
}
egress {
from_port = 5432
to_port = 5432
protocol = "tcp"
cidr_blocks = ["10.0.2.0/24"] # Database subnet
}
}
Audit Logging
// audit/logger.ts
interface AuditLog {
timestamp: string;
action: string;
queue: string;
jobId?: string;
userId?: string;
ip?: string;
success: boolean;
details?: Record<string, any>;
}
class AuditLogger {
private logs: AuditLog[] = [];
log(entry: Omit<AuditLog, 'timestamp'>) {
const auditEntry: AuditLog = {
...entry,
timestamp: new Date().toISOString()
};
// Log to stdout (for log aggregation)
console.log(JSON.stringify(auditEntry));
// Also push to audit queue for persistence
flashq.push('audit:logs', auditEntry, {
ttl: 86400000 * 90 // Keep for 90 days
});
}
}
const audit = new AuditLogger();
// Wrap flashQ operations with audit logging
class AuditedFlashQ {
private client: FlashQ;
private userId: string;
private ip: string;
constructor(client: FlashQ, userId: string, ip: string) {
this.client = client;
this.userId = userId;
this.ip = ip;
}
async push(queue: string, data: any, options?: any) {
try {
const result = await this.client.push(queue, data, options);
audit.log({
action: 'push',
queue,
jobId: result.id,
userId: this.userId,
ip: this.ip,
success: true
});
return result;
} catch (error) {
audit.log({
action: 'push',
queue,
userId: this.userId,
ip: this.ip,
success: false,
details: { error: error.message }
});
throw error;
}
}
}
Secrets Management
// Never store secrets in code or environment directly
// Use a secrets manager
// AWS Secrets Manager
import { SecretsManager } from '@aws-sdk/client-secrets-manager';
async function getFlashQToken(): Promise<string> {
const client = new SecretsManager({ region: 'us-east-1' });
const response = await client.getSecretValue({
SecretId: 'flashq/production/token'
});
return response.SecretString!;
}
// HashiCorp Vault
import Vault from 'node-vault';
async function getSecretsFromVault() {
const vault = Vault({
apiVersion: 'v1',
endpoint: process.env.VAULT_ADDR
});
await vault.approleLogin({
role_id: process.env.VAULT_ROLE_ID,
secret_id: process.env.VAULT_SECRET_ID
});
const secrets = await vault.read('secret/data/flashq');
return secrets.data.data;
}
// Kubernetes secrets
// Mount as files, not environment variables
const token = readFileSync('/etc/secrets/flashq-token', 'utf8').trim();
Compliance Considerations
GDPR / Data Privacy
// Implement data retention policies
async function cleanupUserData(userId: string) {
// Find all jobs for user
const queues = await flashq.listQueues();
for (const queue of queues) {
const jobs = await flashq.getJobs(queue, 'completed', 10000);
for (const job of jobs) {
if (job.data?.userId === userId) {
// Anonymize or delete
await flashq.update(job.id, {
...job.data,
email: '[REDACTED]',
name: '[REDACTED]',
personalData: null
});
}
}
}
}
// Auto-cleanup old completed jobs
await flashq.clean('*', 86400000 * 30, 'completed'); // 30 days
PCI-DSS (Payment Data)
// Never store raw card data in jobs
// Only store tokenized references
interface PaymentJob {
paymentTokenId: string; // Token from payment processor
orderId: string;
amount: number;
// NO: cardNumber, cvv, expiryDate
}
// Worker fetches actual payment data from secure vault
const worker = new Worker(flashq, 'payments', async (job) => {
const { paymentTokenId, orderId, amount } = job.data;
// Retrieve from payment processor
const result = await stripe.paymentIntents.create({
payment_method: paymentTokenId,
amount,
currency: 'usd',
confirm: true
});
return { success: true, transactionId: result.id };
});
✓ Enable authentication tokens
✓ Use TLS for all connections
✓ Encrypt sensitive job data
✓ Validate all inputs
✓ Set rate limits
✓ Configure network policies
✓ Enable audit logging
✓ Use secrets manager
✓ Implement data retention
Conclusion
Security requires a defense-in-depth approach. By implementing authentication, encryption, validation, and monitoring, you can confidently run flashQ in production with sensitive workloads.