AI agents are autonomous systems that can plan, reason, and execute complex tasks. When combined with LangChain's agent framework and flashQ's reliable job processing, you can build production-ready agents that scale. This guide covers everything from basic tool execution to multi-agent orchestration.
What Are AI Agents?
Unlike simple chatbots, AI agents can:
- Plan: Break down complex tasks into steps
- Execute: Use tools to accomplish subtasks
- React: Adapt based on tool results
- Remember: Maintain context across interactions
AI agents often execute long-running tasks (web searches, API calls, computations). flashQ ensures reliable execution with retries, timeouts, and progress tracking.
Agent Architecture
// Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ User Request │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Agent Controller │
│ • Parse intent │
│ • Create execution plan │
│ • Coordinate tools │
└─────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Search │ │ Code │ │ Database │
│ Tool │ │ Executor │ │ Tool │
│ (Queue) │ │ (Queue) │ │ (Queue) │
└────────────┘ └────────────┘ └────────────┘
│ │ │
└───────────────┼───────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ flashQ Workers │
│ • Execute tools async │
│ • Handle retries │
│ • Track progress │
└─────────────────────────────────────────────────────────┘
Project Setup
# Install dependencies
npm install flashq langchain @langchain/openai @langchain/community zod
// agent-config.ts
import { FlashQ, Worker } from 'flashq';
import { ChatOpenAI } from '@langchain/openai';
export const flashq = new FlashQ({
host: process.env.FLASHQ_HOST || 'localhost',
port: 6789,
token: process.env.FLASHQ_TOKEN
});
export const llm = new ChatOpenAI({
modelName: 'gpt-4-turbo',
temperature: 0,
openAIApiKey: process.env.OPENAI_API_KEY
});
// Tool queue names
export const QUEUES = {
AGENT_TASKS: 'agent:tasks',
TOOL_SEARCH: 'tool:search',
TOOL_CODE: 'tool:code',
TOOL_DATABASE: 'tool:database',
TOOL_EMAIL: 'tool:email'
};
Creating LangChain Tools with flashQ
Define Async Tools
// tools/search-tool.ts
import { DynamicStructuredTool } from 'langchain/tools';
import { z } from 'zod';
import { flashq, QUEUES } from '../agent-config';
export const searchTool = new DynamicStructuredTool({
name: 'web_search',
description: 'Search the web for current information. Use for questions about recent events, facts, or research.',
schema: z.object({
query: z.string().describe('The search query'),
maxResults: z.number().default(5).describe('Maximum results to return')
}),
func: async ({ query, maxResults }) => {
// Push to flashQ for async execution
const job = await flashq.push(QUEUES.TOOL_SEARCH, {
query,
maxResults,
timestamp: Date.now()
}, {
timeout: 30000,
max_attempts: 3
});
// Wait for result (blocks until complete)
const result = await flashq.finished(job.id, 60000);
return JSON.stringify(result.data);
}
});
// tools/code-executor.ts
export const codeExecutorTool = new DynamicStructuredTool({
name: 'execute_code',
description: 'Execute Python code in a sandboxed environment. Use for calculations, data analysis, or code testing.',
schema: z.object({
code: z.string().describe('Python code to execute'),
timeout: z.number().default(30).describe('Execution timeout in seconds')
}),
func: async ({ code, timeout }) => {
const job = await flashq.push(QUEUES.TOOL_CODE, {
code,
language: 'python',
sandboxed: true
}, {
timeout: timeout * 1000,
max_attempts: 1 // No retry for code execution
});
const result = await flashq.finished(job.id);
if (result.error) {
return `Error: ${result.error}`;
}
return result.output;
}
});
// tools/database-tool.ts
export const databaseTool = new DynamicStructuredTool({
name: 'query_database',
description: 'Query the database. Use for retrieving user data, analytics, or stored information.',
schema: z.object({
sql: z.string().describe('SQL query (SELECT only)'),
params: z.array(z.any()).default([]).describe('Query parameters')
}),
func: async ({ sql, params }) => {
// Validate: only SELECT allowed
if (!sql.trim().toLowerCase().startsWith('select')) {
return 'Error: Only SELECT queries are allowed';
}
const job = await flashq.push(QUEUES.TOOL_DATABASE, {
sql,
params,
readOnly: true
}, {
timeout: 10000
});
const result = await flashq.finished(job.id);
return JSON.stringify(result.rows, null, 2);
}
});
Tool Workers
// workers/search-worker.ts
import { Worker } from 'flashq';
import { flashq, QUEUES } from '../agent-config';
const searchWorker = new Worker(flashq, QUEUES.TOOL_SEARCH, async (job) => {
const { query, maxResults } = job.data;
// Use your preferred search API
const response = await fetch(
`https://api.search.io/search?q=${encodeURIComponent(query)}&limit=${maxResults}`,
{ headers: { 'Authorization': `Bearer ${process.env.SEARCH_API_KEY}` } }
);
const data = await response.json();
return {
query,
results: data.results.map((r: any) => ({
title: r.title,
url: r.url,
snippet: r.snippet
}))
};
}, { concurrency: 10 });
// workers/code-worker.ts
import { spawn } from 'child_process';
const codeWorker = new Worker(flashq, QUEUES.TOOL_CODE, async (job) => {
const { code, timeout = 30 } = job.data;
return new Promise((resolve, reject) => {
const process = spawn('python3', ['-c', code], {
timeout: timeout * 1000
});
let output = '';
let error = '';
process.stdout.on('data', (data) => output += data);
process.stderr.on('data', (data) => error += data);
process.on('close', (code) => {
if (code === 0) {
resolve({ output: output.trim() });
} else {
resolve({ error: error.trim() || 'Execution failed' });
}
});
});
}, { concurrency: 5 });
ReAct Agent Pattern
The ReAct (Reasoning + Acting) pattern combines chain-of-thought reasoning with tool execution.
// agents/react-agent.ts
import { AgentExecutor, createReactAgent } from 'langchain/agents';
import { pull } from 'langchain/hub';
import { llm, flashq, QUEUES } from '../agent-config';
import { searchTool, codeExecutorTool, databaseTool } from '../tools';
async function createAgent() {
// Pull ReAct prompt from LangChain hub
const prompt = await pull('hwchase17/react');
const tools = [
searchTool,
codeExecutorTool,
databaseTool
];
const agent = await createReactAgent({
llm,
tools,
prompt
});
return new AgentExecutor({
agent,
tools,
verbose: true,
maxIterations: 10,
returnIntermediateSteps: true
});
}
// Run agent as a flashQ job
const agentWorker = new Worker(flashq, QUEUES.AGENT_TASKS, async (job) => {
const { task, context } = job.data;
const executor = await createAgent();
// Update progress as agent thinks
await flashq.progress(job.id, 10, 'Agent initialized');
const result = await executor.invoke({
input: task,
chat_history: context?.history || []
}, {
callbacks: [{
handleAgentAction: async (action) => {
await flashq.progress(
job.id,
50,
`Using tool: ${action.tool}`
);
}
}]
});
await flashq.progress(job.id, 100, 'Complete');
return {
output: result.output,
steps: result.intermediateSteps?.map(s => ({
tool: s.action.tool,
input: s.action.toolInput,
output: s.observation
}))
};
}, { concurrency: 3 });
Multi-Agent Orchestration
For complex tasks, multiple specialized agents can work together.
// agents/multi-agent-system.ts
import { flashq, llm, QUEUES } from '../agent-config';
interface AgentConfig {
name: string;
role: string;
tools: string[];
systemPrompt: string;
}
const AGENTS: AgentConfig[] = [
{
name: 'researcher',
role: 'Research and gather information',
tools: ['web_search', 'query_database'],
systemPrompt: `You are a research specialist. Your job is to find accurate,
relevant information. Always cite your sources.`
},
{
name: 'analyst',
role: 'Analyze data and draw conclusions',
tools: ['execute_code', 'query_database'],
systemPrompt: `You are a data analyst. Analyze the provided data and extract
meaningful insights. Use code for complex calculations.`
},
{
name: 'writer',
role: 'Create polished content',
tools: [],
systemPrompt: `You are a technical writer. Transform research and analysis
into clear, engaging content.`
}
];
// Orchestrator coordinates multiple agents
async function orchestrateAgents(task: string): Promise<string> {
// Step 1: Research phase
const researchJob = await flashq.push('agent:researcher', {
task: `Research the following topic: ${task}`,
agent: AGENTS[0]
});
const research = await flashq.finished(researchJob.id, 120000);
// Step 2: Analysis phase (depends on research)
const analysisJob = await flashq.push('agent:analyst', {
task: `Analyze this research: ${JSON.stringify(research.data)}`,
agent: AGENTS[1],
context: { research: research.data }
}, {
depends_on: [researchJob.id]
});
const analysis = await flashq.finished(analysisJob.id, 120000);
// Step 3: Writing phase (depends on both)
const writeJob = await flashq.push('agent:writer', {
task: `Write a report based on: ${JSON.stringify(analysis.data)}`,
agent: AGENTS[2],
context: { research: research.data, analysis: analysis.data }
});
const report = await flashq.finished(writeJob.id, 60000);
return report.data.content;
}
// Using flashQ flows for parallel agents
async function parallelAgents(tasks: string[]) {
const flow = {
name: 'parallel-research',
children: tasks.map((task, i) => ({
name: `research-${i}`,
data: { task, agent: AGENTS[0] }
}))
};
const result = await flashq.pushFlow('agent:researcher', flow);
// Wait for all parallel tasks
const results = await Promise.all(
result.children.map(child => flashq.finished(child.id))
);
return results.map(r => r.data);
}
Agent Memory Management
// memory/agent-memory.ts
import { BufferMemory, ConversationSummaryMemory } from 'langchain/memory';
import { flashq, llm } from '../agent-config';
interface MemoryEntry {
sessionId: string;
messages: any[];
summary: string;
updatedAt: number;
}
// Memory persisted via flashQ
class PersistentAgentMemory {
private sessionId: string;
private memory: BufferMemory;
constructor(sessionId: string) {
this.sessionId = sessionId;
this.memory = new BufferMemory({
returnMessages: true,
memoryKey: 'chat_history'
});
}
async load(): Promise<void> {
// Load from flashQ result storage
const stored = await flashq.getResult(`memory:${this.sessionId}`);
if (stored) {
for (const msg of stored.messages) {
await this.memory.saveContext(
{ input: msg.human },
{ output: msg.ai }
);
}
}
}
async save(human: string, ai: string): Promise<void> {
await this.memory.saveContext({ input: human }, { output: ai });
// Persist to flashQ
const history = await this.memory.loadMemoryVariables({});
await flashq.push('memory:save', {
sessionId: this.sessionId,
messages: history.chat_history,
updatedAt: Date.now()
}, {
jobId: `memory:${this.sessionId}`
});
}
async getHistory() {
return this.memory.loadMemoryVariables({});
}
}
// Agent with memory
async function agentWithMemory(sessionId: string, input: string) {
const memory = new PersistentAgentMemory(sessionId);
await memory.load();
const history = await memory.getHistory();
// Run agent with history context
const job = await flashq.push(QUEUES.AGENT_TASKS, {
task: input,
context: { history: history.chat_history }
});
const result = await flashq.finished(job.id);
// Save interaction to memory
await memory.save(input, result.data.output);
return result.data.output;
}
Streaming Agent Responses
// streaming/agent-stream.ts
import { flashq, llm } from '../agent-config';
// Server-side: Stream agent thoughts via WebSocket
async function streamAgentExecution(ws: WebSocket, taskId: string) {
const eventSource = new EventSource(
`http://localhost:6790/events/${taskId}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.event) {
case 'progress':
ws.send(JSON.stringify({
type: 'thinking',
message: data.message,
progress: data.progress
}));
break;
case 'completed':
ws.send(JSON.stringify({
type: 'complete',
result: data.result
}));
eventSource.close();
break;
}
};
}
// Client-side: React component for streaming
function AgentChat() {
const [messages, setMessages] = useState([]);
const [thinking, setThinking] = useState(null);
async function sendMessage(input: string) {
// Start agent task
const response = await fetch('/api/agent', {
method: 'POST',
body: JSON.stringify({ input })
});
const { taskId } = await response.json();
// Connect to WebSocket for updates
const ws = new WebSocket(`ws://localhost:6790/ws?token=xxx`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'thinking') {
setThinking(data.message);
} else if (data.type === 'complete') {
setThinking(null);
setMessages(prev => [...prev, {
role: 'assistant',
content: data.result
}]);
ws.close();
}
};
}
return (
<div>
{messages.map((msg, i) => <Message key={i} {...msg} />)}
{thinking && <ThinkingIndicator message={thinking} />}
</div>
);
}
Error Handling & Recovery
// error-handling/agent-recovery.ts
import { flashq, QUEUES } from '../agent-config';
// Wrap agent execution with recovery
async function resilientAgentExecution(task: string) {
const job = await flashq.push(QUEUES.AGENT_TASKS, {
task,
startedAt: Date.now()
}, {
max_attempts: 3,
backoff: 5000,
timeout: 300000, // 5 min timeout
// Custom error handling
onFailed: async (error) => {
console.error(`Agent task failed: ${error}`);
// Push to DLQ for manual review
await flashq.push('agent:failed', {
originalTask: task,
error: error.message,
timestamp: Date.now()
});
}
});
try {
return await flashq.finished(job.id, 300000);
} catch (error) {
// Timeout or failure - check job state
const state = await flashq.getState(job.id);
if (state === 'failed') {
const dlqJobs = await flashq.getDlq(QUEUES.AGENT_TASKS);
const failedJob = dlqJobs.find(j => j.id === job.id);
return {
success: false,
error: failedJob?.error || 'Unknown error',
attempts: failedJob?.attempts
};
}
throw error;
}
}
// Tool-level error handling
const robustSearchTool = new DynamicStructuredTool({
name: 'robust_search',
description: 'Search with automatic fallback',
schema: z.object({ query: z.string() }),
func: async ({ query }) => {
const providers = ['google', 'bing', 'duckduckgo'];
for (const provider of providers) {
try {
const job = await flashq.push(QUEUES.TOOL_SEARCH, {
query,
provider
}, { timeout: 15000, max_attempts: 1 });
const result = await flashq.finished(job.id, 20000);
return JSON.stringify(result.data);
} catch (error) {
console.warn(`${provider} failed, trying next...`);
}
}
return 'Search temporarily unavailable';
}
});
Monitoring & Observability
// monitoring/agent-metrics.ts
import { flashq } from '../agent-config';
// Dashboard metrics
async function getAgentMetrics() {
const metrics = await flashq.metrics();
const agentQueues = Object.entries(metrics.queues)
.filter(([name]) => name.startsWith('agent:') || name.startsWith('tool:'));
return {
totalAgentTasks: agentQueues.reduce((sum, [, q]) =>
sum + q.completed + q.active + q.waiting, 0),
activeAgents: agentQueues
.filter(([, q]) => q.active > 0)
.map(([name, q]) => ({ name, active: q.active })),
toolUsage: agentQueues
.filter(([name]) => name.startsWith('tool:'))
.map(([name, q]) => ({
tool: name.replace('tool:', ''),
completed: q.completed,
failed: q.failed,
avgLatency: q.avgProcessingTime
})),
errorRate: agentQueues.reduce((sum, [, q]) => sum + q.failed, 0) /
agentQueues.reduce((sum, [, q]) => sum + q.completed + q.failed, 1)
};
}
// Prometheus metrics endpoint
app.get('/metrics/agents', async (req, res) => {
const metrics = await getAgentMetrics();
const prometheus = `
# HELP agent_tasks_total Total agent tasks
# TYPE agent_tasks_total counter
agent_tasks_total ${metrics.totalAgentTasks}
# HELP agent_error_rate Agent error rate
# TYPE agent_error_rate gauge
agent_error_rate ${metrics.errorRate}
${metrics.toolUsage.map(t => `
# HELP tool_${t.tool}_completed Tool executions
# TYPE tool_${t.tool}_completed counter
tool_${t.tool}_completed ${t.completed}
tool_${t.tool}_latency_ms ${t.avgLatency}`).join('\n')}
`;
res.set('Content-Type', 'text/plain');
res.send(prometheus);
});
✓ Rate limit LLM API calls
✓ Set appropriate timeouts for each tool
✓ Implement graceful degradation
✓ Monitor token usage and costs
✓ Log all agent decisions
✓ Test tool error scenarios
Conclusion
Combining flashQ with LangChain gives you a robust foundation for production AI agents. The key benefits are reliable tool execution, automatic retries, progress tracking, and scalable worker pools for parallel agent operations.
Build AI Agents Today
Start building production-ready AI agents with flashQ and LangChain.
Get Started →