FeaturesBlogDocs GitHub Get Started

flashQ Security Best Practices

Security is critical for any job queue system handling sensitive data. This guide covers authentication, encryption, input validation, network security, and compliance best practices for flashQ deployments.

Security First

Never deploy flashQ to production without enabling authentication and reviewing these security practices. An unsecured queue can expose sensitive data and allow arbitrary code execution.

Authentication

Token-Based Authentication

flashQ uses bearer token authentication. Configure tokens via environment variables.

# Server configuration
AUTH_TOKENS=token1,token2,token3

# Or a single secure token
AUTH_TOKENS=$(openssl rand -hex 32)
// Client authentication
import { FlashQ } from 'flashq';

const client = new FlashQ({
  host: 'flashq.internal',
  port: 6789,
  token: process.env.FLASHQ_TOKEN // Never hardcode!
});

await client.connect();

Token Rotation

// Implement token rotation without downtime
// 1. Add new token while keeping old ones
AUTH_TOKENS=old_token,new_token

// 2. Update all clients to use new token
const client = new FlashQ({
  token: process.env.FLASHQ_TOKEN_NEW
});

// 3. After all clients updated, remove old token
AUTH_TOKENS=new_token

Per-Queue Authorization

// Implement role-based queue access
interface TokenPermissions {
  token: string;
  queues: string[];        // Allowed queue patterns
  operations: string[];    // push, pull, admin, etc.
}

const permissions: TokenPermissions[] = [
  {
    token: 'worker-token-xxx',
    queues: ['jobs:*', 'tasks:*'],
    operations: ['pull', 'ack', 'fail', 'progress']
  },
  {
    token: 'producer-token-xxx',
    queues: ['jobs:*'],
    operations: ['push', 'getJob', 'cancel']
  },
  {
    token: 'admin-token-xxx',
    queues: ['*'],
    operations: ['*']
  }
];

// Middleware to check permissions
function checkPermission(token: string, queue: string, op: string): boolean {
  const perm = permissions.find(p => p.token === token);
  if (!perm) return false;

  const queueAllowed = perm.queues.some(pattern =>
    matchPattern(pattern, queue)
  );
  const opAllowed = perm.operations.includes('*') ||
                    perm.operations.includes(op);

  return queueAllowed && opAllowed;
}

Encryption

TLS/SSL for Transport

# Enable TLS with nginx reverse proxy
# nginx.conf
server {
    listen 443 ssl;
    server_name flashq.example.com;

    ssl_certificate /etc/ssl/flashq.crt;
    ssl_certificate_key /etc/ssl/flashq.key;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;

    location / {
        proxy_pass http://flashq:6790;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

# TCP passthrough for binary protocol
stream {
    server {
        listen 6789 ssl;
        proxy_pass flashq:6789;
        ssl_certificate /etc/ssl/flashq.crt;
        ssl_certificate_key /etc/ssl/flashq.key;
    }
}

Encrypting Job Data

// encryption/job-encryption.ts
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';

const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex'); // 32 bytes

interface EncryptedData {
  iv: string;
  tag: string;
  data: string;
}

function encryptJobData(data: any): EncryptedData {
  const iv = randomBytes(16);
  const cipher = createCipheriv(ALGORITHM, KEY, iv);

  const plaintext = JSON.stringify(data);
  let encrypted = cipher.update(plaintext, 'utf8', 'base64');
  encrypted += cipher.final('base64');

  return {
    iv: iv.toString('base64'),
    tag: cipher.getAuthTag().toString('base64'),
    data: encrypted
  };
}

function decryptJobData(encrypted: EncryptedData): any {
  const iv = Buffer.from(encrypted.iv, 'base64');
  const tag = Buffer.from(encrypted.tag, 'base64');
  const decipher = createDecipheriv(ALGORITHM, KEY, iv);
  decipher.setAuthTag(tag);

  let decrypted = decipher.update(encrypted.data, 'base64', 'utf8');
  decrypted += decipher.final('utf8');

  return JSON.parse(decrypted);
}

// Usage with flashQ
async function pushEncryptedJob(queue: string, data: any) {
  const encrypted = encryptJobData(data);
  return flashq.push(queue, { encrypted });
}

// Worker decryption
const worker = new Worker(flashq, 'secure-jobs', async (job) => {
  const data = decryptJobData(job.data.encrypted);
  // Process decrypted data
  return processJob(data);
});

Field-Level Encryption

// Encrypt only sensitive fields
const SENSITIVE_FIELDS = ['ssn', 'creditCard', 'password', 'apiKey'];

function encryptSensitiveFields(data: Record<string, any>): Record<string, any> {
  const result = { ...data };

  for (const field of SENSITIVE_FIELDS) {
    if (field in result) {
      result[`_encrypted_${field}`] = encryptJobData(result[field]);
      delete result[field];
    }
  }

  return result;
}

function decryptSensitiveFields(data: Record<string, any>): Record<string, any> {
  const result = { ...data };

  for (const key of Object.keys(result)) {
    if (key.startsWith('_encrypted_')) {
      const field = key.replace('_encrypted_', '');
      result[field] = decryptJobData(result[key]);
      delete result[key];
    }
  }

  return result;
}

Input Validation

Schema Validation with Zod

// validation/job-schemas.ts
import { z } from 'zod';

// Define strict schemas for each job type
const EmailJobSchema = z.object({
  to: z.string().email().max(254),
  subject: z.string().max(998), // RFC 5322 limit
  body: z.string().max(1048576), // 1MB max
  attachments: z.array(z.object({
    filename: z.string().max(255).regex(/^[a-zA-Z0-9._-]+$/),
    contentType: z.string().max(100),
    size: z.number().max(10485760) // 10MB max
  })).max(10).optional()
});

const PaymentJobSchema = z.object({
  orderId: z.string().uuid(),
  amount: z.number().positive().max(999999.99),
  currency: z.enum(['USD', 'EUR', 'GBP']),
  customerId: z.string().uuid()
});

const ImageProcessingSchema = z.object({
  imageUrl: z.string().url().startsWith('https://'),
  operations: z.array(z.enum([
    'resize', 'crop', 'rotate', 'compress'
  ])).max(10),
  outputFormat: z.enum(['jpeg', 'png', 'webp'])
});

// Validate before pushing
async function pushValidatedJob<T>(
  queue: string,
  schema: z.Schema<T>,
  data: unknown
) {
  const validated = schema.parse(data);
  return flashq.push(queue, validated);
}

// Usage
await pushValidatedJob('emails', EmailJobSchema, {
  to: 'user@example.com',
  subject: 'Hello',
  body: 'Content here'
});

Sanitization

// validation/sanitize.ts
import DOMPurify from 'isomorphic-dompurify';

// Sanitize HTML content
function sanitizeHtml(html: string): string {
  return DOMPurify.sanitize(html, {
    ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'p', 'br'],
    ALLOWED_ATTR: []
  });
}

// Remove potential injection characters
function sanitizeString(str: string): string {
  return str
    .replace(/[<>'"&]/g, '') // Remove dangerous chars
    .trim()
    .slice(0, 10000); // Limit length
}

// Sanitize file paths (prevent path traversal)
function sanitizePath(path: string): string {
  return path
    .replace(/\.\./g, '') // Remove parent directory refs
    .replace(/^\//, '')   // Remove leading slash
    .replace(/[^a-zA-Z0-9._/-]/g, ''); // Whitelist chars
}

// Sanitize SQL (for dynamic queries, prefer parameterized)
function sanitizeSqlIdentifier(identifier: string): string {
  if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(identifier)) {
    throw new Error('Invalid SQL identifier');
  }
  return identifier;
}

Rate Limiting

Queue-Level Rate Limits

// Set rate limits per queue
await flashq.setRateLimit('api-calls', {
  max: 100,        // 100 jobs
  duration: 60000 // per minute
});

// Different limits for different queues
const rateLimits = {
  'emails': { max: 1000, duration: 3600000 },    // 1000/hour
  'payments': { max: 100, duration: 60000 },       // 100/minute
  'notifications': { max: 10000, duration: 60000 } // 10000/minute
};

for (const [queue, limit] of Object.entries(rateLimits)) {
  await flashq.setRateLimit(queue, limit);
}

Client-Level Rate Limiting

// Implement client-side rate limiting
import { RateLimiter } from 'limiter';

class RateLimitedClient {
  private client: FlashQ;
  private limiters: Map<string, RateLimiter> = new Map();

  constructor(client: FlashQ) {
    this.client = client;
  }

  private getLimiter(queue: string): RateLimiter {
    if (!this.limiters.has(queue)) {
      this.limiters.set(queue, new RateLimiter({
        tokensPerInterval: 100,
        interval: 'minute'
      }));
    }
    return this.limiters.get(queue)!;
  }

  async push(queue: string, data: any, options?: any) {
    const limiter = this.getLimiter(queue);

    if (!limiter.tryRemoveTokens(1)) {
      throw new Error(`Rate limit exceeded for queue: ${queue}`);
    }

    return this.client.push(queue, data, options);
  }
}

Network Security

Firewall Configuration

# Only allow internal network access to flashQ
# iptables rules
iptables -A INPUT -p tcp --dport 6789 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 6789 -j DROP

iptables -A INPUT -p tcp --dport 6790 -s 10.0.0.0/8 -j ACCEPT
iptables -A INPUT -p tcp --dport 6790 -j DROP

Kubernetes Network Policy

# k8s/network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: flashq-network-policy
  namespace: flashq
spec:
  podSelector:
    matchLabels:
      app: flashq
  policyTypes:
    - Ingress
    - Egress
  ingress:
    # Only allow from workers namespace
    - from:
        - namespaceSelector:
            matchLabels:
              name: workers
        - podSelector:
            matchLabels:
              role: worker
      ports:
        - port: 6789
        - port: 6790
    # Allow from monitoring
    - from:
        - namespaceSelector:
            matchLabels:
              name: monitoring
      ports:
        - port: 6790 # Metrics only
  egress:
    # Only allow PostgreSQL
    - to:
        - namespaceSelector:
            matchLabels:
              name: databases
      ports:
        - port: 5432

Private VPC Deployment

# terraform/vpc.tf
resource "aws_vpc" "flashq" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true

  tags = {
    Name = "flashq-vpc"
  }
}

resource "aws_subnet" "private" {
  vpc_id            = aws_vpc.flashq.id
  cidr_block        = "10.0.1.0/24"
  availability_zone = "us-east-1a"

  tags = {
    Name = "flashq-private"
  }
}

# No internet gateway - truly private
resource "aws_security_group" "flashq" {
  vpc_id = aws_vpc.flashq.id

  ingress {
    from_port   = 6789
    to_port     = 6790
    protocol    = "tcp"
    cidr_blocks = ["10.0.0.0/16"]
  }

  egress {
    from_port   = 5432
    to_port     = 5432
    protocol    = "tcp"
    cidr_blocks = ["10.0.2.0/24"] # Database subnet
  }
}

Audit Logging

// audit/logger.ts
interface AuditLog {
  timestamp: string;
  action: string;
  queue: string;
  jobId?: string;
  userId?: string;
  ip?: string;
  success: boolean;
  details?: Record<string, any>;
}

class AuditLogger {
  private logs: AuditLog[] = [];

  log(entry: Omit<AuditLog, 'timestamp'>) {
    const auditEntry: AuditLog = {
      ...entry,
      timestamp: new Date().toISOString()
    };

    // Log to stdout (for log aggregation)
    console.log(JSON.stringify(auditEntry));

    // Also push to audit queue for persistence
    flashq.push('audit:logs', auditEntry, {
      ttl: 86400000 * 90 // Keep for 90 days
    });
  }
}

const audit = new AuditLogger();

// Wrap flashQ operations with audit logging
class AuditedFlashQ {
  private client: FlashQ;
  private userId: string;
  private ip: string;

  constructor(client: FlashQ, userId: string, ip: string) {
    this.client = client;
    this.userId = userId;
    this.ip = ip;
  }

  async push(queue: string, data: any, options?: any) {
    try {
      const result = await this.client.push(queue, data, options);
      audit.log({
        action: 'push',
        queue,
        jobId: result.id,
        userId: this.userId,
        ip: this.ip,
        success: true
      });
      return result;
    } catch (error) {
      audit.log({
        action: 'push',
        queue,
        userId: this.userId,
        ip: this.ip,
        success: false,
        details: { error: error.message }
      });
      throw error;
    }
  }
}

Secrets Management

// Never store secrets in code or environment directly
// Use a secrets manager

// AWS Secrets Manager
import { SecretsManager } from '@aws-sdk/client-secrets-manager';

async function getFlashQToken(): Promise<string> {
  const client = new SecretsManager({ region: 'us-east-1' });
  const response = await client.getSecretValue({
    SecretId: 'flashq/production/token'
  });
  return response.SecretString!;
}

// HashiCorp Vault
import Vault from 'node-vault';

async function getSecretsFromVault() {
  const vault = Vault({
    apiVersion: 'v1',
    endpoint: process.env.VAULT_ADDR
  });

  await vault.approleLogin({
    role_id: process.env.VAULT_ROLE_ID,
    secret_id: process.env.VAULT_SECRET_ID
  });

  const secrets = await vault.read('secret/data/flashq');
  return secrets.data.data;
}

// Kubernetes secrets
// Mount as files, not environment variables
const token = readFileSync('/etc/secrets/flashq-token', 'utf8').trim();

Compliance Considerations

GDPR / Data Privacy

// Implement data retention policies
async function cleanupUserData(userId: string) {
  // Find all jobs for user
  const queues = await flashq.listQueues();

  for (const queue of queues) {
    const jobs = await flashq.getJobs(queue, 'completed', 10000);

    for (const job of jobs) {
      if (job.data?.userId === userId) {
        // Anonymize or delete
        await flashq.update(job.id, {
          ...job.data,
          email: '[REDACTED]',
          name: '[REDACTED]',
          personalData: null
        });
      }
    }
  }
}

// Auto-cleanup old completed jobs
await flashq.clean('*', 86400000 * 30, 'completed'); // 30 days

PCI-DSS (Payment Data)

// Never store raw card data in jobs
// Only store tokenized references

interface PaymentJob {
  paymentTokenId: string;  // Token from payment processor
  orderId: string;
  amount: number;
  // NO: cardNumber, cvv, expiryDate
}

// Worker fetches actual payment data from secure vault
const worker = new Worker(flashq, 'payments', async (job) => {
  const { paymentTokenId, orderId, amount } = job.data;

  // Retrieve from payment processor
  const result = await stripe.paymentIntents.create({
    payment_method: paymentTokenId,
    amount,
    currency: 'usd',
    confirm: true
  });

  return { success: true, transactionId: result.id };
});
Security Checklist

✓ Enable authentication tokens
✓ Use TLS for all connections
✓ Encrypt sensitive job data
✓ Validate all inputs
✓ Set rate limits
✓ Configure network policies
✓ Enable audit logging
✓ Use secrets manager
✓ Implement data retention

Conclusion

Security requires a defense-in-depth approach. By implementing authentication, encryption, validation, and monitoring, you can confidently run flashQ in production with sensitive workloads.

Secure Your Queue

Start with our security-first deployment guide.

Get Started →
ESC