AI agents call tools to accomplish tasks. An agent might need to search the web, query a database, and fetch weather data - all to answer a single question. These tools can often run in parallel.
flashQ's job dependencies make this pattern easy: fan-out to execute tools in parallel, then fan-in to aggregate results.
The Fan-Out/Fan-In Pattern
┌─────────────┐
│ Web Search │
└─────────────┘
│
┌─────────┐ ┌─────────────┐ ┌───────────┐
│ Plan │ ──▶ │ Database │ ──▶ │ Aggregate │
└─────────┘ └─────────────┘ └───────────┘
│
┌─────────────┐
│ Weather │
└─────────────┘
- Plan: LLM decides which tools to call
- Fan-out: Execute tools in parallel
- Fan-in: Aggregate results and generate response
Basic Example
Execute multiple tool calls in parallel, then aggregate:
import { Queue } from 'flashq';
const queue = new Queue('agent-tools');
async function runAgent(question: string) {
// Step 1: LLM plans which tools to call
const planJob = await queue.add('plan', { question });
// Wait for plan to complete
const plan = await queue.finished(planJob.id);
// Step 2: Fan-out - execute tools in parallel
const toolJobs = await Promise.all(
plan.tools.map(tool =>
queue.add('tool', {
name: tool.name,
params: tool.params
})
)
);
// Step 3: Fan-in - aggregate waits for ALL tool jobs
const aggregateJob = await queue.add('aggregate', {
question
}, {
dependsOn: toolJobs.map(j => j.id)
});
return queue.finished(aggregateJob.id);
}
Worker Implementation
import { Worker } from 'flashq';
import OpenAI from 'openai';
const openai = new OpenAI();
new Worker('agent-tools', async (job) => {
switch (job.name) {
case 'plan':
return handlePlan(job);
case 'tool':
return handleTool(job);
case 'aggregate':
return handleAggregate(job);
}
});
Plan Handler
The LLM decides which tools to call:
async function handlePlan(job) {
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: `You are a planning agent. Given a question, decide which tools to call.
Available tools: web_search, database_query, weather, calculator
Return JSON: { "tools": [{ "name": "tool_name", "params": {...} }] }`
},
{ role: 'user', content: job.data.question }
],
response_format: { type: 'json_object' }
});
return JSON.parse(response.choices[0].message.content);
}
Tool Handler
Execute the appropriate tool:
async function handleTool(job) {
const { name, params } = job.data;
switch (name) {
case 'web_search':
return await searchWeb(params.query);
case 'database_query':
return await queryDatabase(params.sql);
case 'weather':
return await getWeather(params.location);
case 'calculator':
return eval(params.expression); // Use a safe evaluator
default:
throw new Error(`Unknown tool: ${name}`);
}
}
Aggregate Handler
Combine all tool results into a final answer:
async function handleAggregate(job) {
// Get all tool results from parent jobs
const toolResults = job.parentResults;
const context = toolResults
.map((result, i) => `Tool ${i + 1}: ${JSON.stringify(result)}`)
.join('\n\n');
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'Synthesize the tool results into a comprehensive answer.'
},
{
role: 'user',
content: `Question: ${job.data.question}\n\nTool Results:\n${context}`
}
]
});
return response.choices[0].message.content;
}
Use Case: Research Agent
An agent that researches a topic from multiple sources:
async function researchTopic(topic: string) {
// Search multiple sources in parallel
const sources = ['arxiv', 'wikipedia', 'news', 'github'];
const searchJobs = await Promise.all(
sources.map(source =>
queue.add('search', { topic, source })
)
);
// Summarize each source (depends on its search)
const summarizeJobs = await Promise.all(
searchJobs.map(searchJob =>
queue.add('summarize', {}, { dependsOn: [searchJob.id] })
)
);
// Final synthesis waits for all summaries
const synthesizeJob = await queue.add('synthesize', {
topic
}, {
dependsOn: summarizeJobs.map(j => j.id)
});
return queue.finished(synthesizeJob.id);
}
Use Case: Data Enrichment
Enrich a contact record with data from multiple APIs:
async function enrichContact(email: string) {
// Fetch data from multiple sources in parallel
const linkedinJob = await queue.add('fetch-linkedin', { email });
const twitterJob = await queue.add('fetch-twitter', { email });
const companyJob = await queue.add('fetch-company', { email });
const clearbitJob = await queue.add('fetch-clearbit', { email });
// Merge all data
const mergeJob = await queue.add('merge-contact', { email }, {
dependsOn: [linkedinJob.id, twitterJob.id, companyJob.id, clearbitJob.id]
});
return queue.finished(mergeJob.id);
}
Handling Partial Failures
Some tools might fail while others succeed. Handle this gracefully:
new Worker('agent-tools', async (job) => {
if (job.name === 'tool') {
try {
return await executeTool(job.data);
} catch (error) {
// Return error instead of throwing
// This lets the aggregate step handle partial results
return {
error: true,
message: error.message,
tool: job.data.name
};
}
}
if (job.name === 'aggregate') {
const results = job.parentResults;
// Filter out failed tools
const successful = results.filter(r => !r.error);
const failed = results.filter(r => r.error);
if (failed.length > 0) {
console.log(`${failed.length} tools failed:`, failed);
}
// Generate answer with available results
return generateAnswer(successful, job.data.question);
}
});
Setting Timeouts
Tool calls can be slow. Set appropriate timeouts:
const webSearchJob = await queue.add('tool', {
name: 'web_search',
params: { query: 'latest AI news' }
}, {
timeout: 30000 // 30 seconds
});
const llmJob = await queue.add('aggregate', { question }, {
dependsOn: [webSearchJob.id],
timeout: 60000 // 60 seconds for LLM
});
Complete Example
import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';
const openai = new OpenAI();
const queue = new Queue('agent');
// Worker
new Worker('agent', async (job) => {
switch (job.name) {
case 'plan':
const plan = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: 'Plan tools. Return JSON: {"tools":[{"name":"","params":{}}]}' },
{ role: 'user', content: job.data.question }
],
response_format: { type: 'json_object' }
});
return JSON.parse(plan.choices[0].message.content);
case 'tool':
return await executeTool(job.data.name, job.data.params);
case 'aggregate':
const context = job.parentResults.map(r => JSON.stringify(r)).join('\n');
const answer = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: 'Answer using the tool results.' },
{ role: 'user', content: `Q: ${job.data.question}\n\nResults:\n${context}` }
]
});
return answer.choices[0].message.content;
}
});
// Run agent
async function ask(question: string) {
const planJob = await queue.add('plan', { question });
const plan = await queue.finished(planJob.id);
const toolJobs = await Promise.all(
plan.tools.map(t => queue.add('tool', t))
);
const aggregateJob = await queue.add('aggregate', { question }, {
dependsOn: toolJobs.map(j => j.id)
});
return queue.finished(aggregateJob.id);
}
// Usage
const answer = await ask("What's the weather in Paris and the current EUR/USD rate?");
console.log(answer);
Key Takeaways
- Use fan-out/fan-in for parallel tool execution
- Set
dependsOnwith multiple job IDs for aggregation - Access all parent results via
job.parentResults - Handle partial failures gracefully