Features Blog Docs GitHub Get Started

AI Agent Workflows: Parallel Tool Execution

AI agents call tools to accomplish tasks. An agent might need to search the web, query a database, and fetch weather data - all to answer a single question. These tools can often run in parallel.

flashQ's job dependencies make this pattern easy: fan-out to execute tools in parallel, then fan-in to aggregate results.

The Fan-Out/Fan-In Pattern

                    ┌─────────────┐
                    │  Web Search │
                    └─────────────┘
                          │
┌─────────┐         ┌─────────────┐         ┌───────────┐
│  Plan   │ ──▶     │  Database   │ ──▶     │ Aggregate │
└─────────┘         └─────────────┘         └───────────┘
                          │
                    ┌─────────────┐
                    │  Weather    │
                    └─────────────┘
  1. Plan: LLM decides which tools to call
  2. Fan-out: Execute tools in parallel
  3. Fan-in: Aggregate results and generate response

Basic Example

Execute multiple tool calls in parallel, then aggregate:

import { Queue } from 'flashq';

const queue = new Queue('agent-tools');

async function runAgent(question: string) {
  // Step 1: LLM plans which tools to call
  const planJob = await queue.add('plan', { question });

  // Wait for plan to complete
  const plan = await queue.finished(planJob.id);

  // Step 2: Fan-out - execute tools in parallel
  const toolJobs = await Promise.all(
    plan.tools.map(tool =>
      queue.add('tool', {
        name: tool.name,
        params: tool.params
      })
    )
  );

  // Step 3: Fan-in - aggregate waits for ALL tool jobs
  const aggregateJob = await queue.add('aggregate', {
    question
  }, {
    dependsOn: toolJobs.map(j => j.id)
  });

  return queue.finished(aggregateJob.id);
}

Worker Implementation

import { Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();

new Worker('agent-tools', async (job) => {
  switch (job.name) {
    case 'plan':
      return handlePlan(job);
    case 'tool':
      return handleTool(job);
    case 'aggregate':
      return handleAggregate(job);
  }
});

Plan Handler

The LLM decides which tools to call:

async function handlePlan(job) {
  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content: `You are a planning agent. Given a question, decide which tools to call.
Available tools: web_search, database_query, weather, calculator
Return JSON: { "tools": [{ "name": "tool_name", "params": {...} }] }`
      },
      { role: 'user', content: job.data.question }
    ],
    response_format: { type: 'json_object' }
  });

  return JSON.parse(response.choices[0].message.content);
}

Tool Handler

Execute the appropriate tool:

async function handleTool(job) {
  const { name, params } = job.data;

  switch (name) {
    case 'web_search':
      return await searchWeb(params.query);

    case 'database_query':
      return await queryDatabase(params.sql);

    case 'weather':
      return await getWeather(params.location);

    case 'calculator':
      return eval(params.expression); // Use a safe evaluator

    default:
      throw new Error(`Unknown tool: ${name}`);
  }
}

Aggregate Handler

Combine all tool results into a final answer:

async function handleAggregate(job) {
  // Get all tool results from parent jobs
  const toolResults = job.parentResults;

  const context = toolResults
    .map((result, i) => `Tool ${i + 1}: ${JSON.stringify(result)}`)
    .join('\n\n');

  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content: 'Synthesize the tool results into a comprehensive answer.'
      },
      {
        role: 'user',
        content: `Question: ${job.data.question}\n\nTool Results:\n${context}`
      }
    ]
  });

  return response.choices[0].message.content;
}

Use Case: Research Agent

An agent that researches a topic from multiple sources:

async function researchTopic(topic: string) {
  // Search multiple sources in parallel
  const sources = ['arxiv', 'wikipedia', 'news', 'github'];

  const searchJobs = await Promise.all(
    sources.map(source =>
      queue.add('search', { topic, source })
    )
  );

  // Summarize each source (depends on its search)
  const summarizeJobs = await Promise.all(
    searchJobs.map(searchJob =>
      queue.add('summarize', {}, { dependsOn: [searchJob.id] })
    )
  );

  // Final synthesis waits for all summaries
  const synthesizeJob = await queue.add('synthesize', {
    topic
  }, {
    dependsOn: summarizeJobs.map(j => j.id)
  });

  return queue.finished(synthesizeJob.id);
}

Use Case: Data Enrichment

Enrich a contact record with data from multiple APIs:

async function enrichContact(email: string) {
  // Fetch data from multiple sources in parallel
  const linkedinJob = await queue.add('fetch-linkedin', { email });
  const twitterJob = await queue.add('fetch-twitter', { email });
  const companyJob = await queue.add('fetch-company', { email });
  const clearbitJob = await queue.add('fetch-clearbit', { email });

  // Merge all data
  const mergeJob = await queue.add('merge-contact', { email }, {
    dependsOn: [linkedinJob.id, twitterJob.id, companyJob.id, clearbitJob.id]
  });

  return queue.finished(mergeJob.id);
}

Handling Partial Failures

Some tools might fail while others succeed. Handle this gracefully:

new Worker('agent-tools', async (job) => {
  if (job.name === 'tool') {
    try {
      return await executeTool(job.data);
    } catch (error) {
      // Return error instead of throwing
      // This lets the aggregate step handle partial results
      return {
        error: true,
        message: error.message,
        tool: job.data.name
      };
    }
  }

  if (job.name === 'aggregate') {
    const results = job.parentResults;

    // Filter out failed tools
    const successful = results.filter(r => !r.error);
    const failed = results.filter(r => r.error);

    if (failed.length > 0) {
      console.log(`${failed.length} tools failed:`, failed);
    }

    // Generate answer with available results
    return generateAnswer(successful, job.data.question);
  }
});

Setting Timeouts

Tool calls can be slow. Set appropriate timeouts:

const webSearchJob = await queue.add('tool', {
  name: 'web_search',
  params: { query: 'latest AI news' }
}, {
  timeout: 30000 // 30 seconds
});

const llmJob = await queue.add('aggregate', { question }, {
  dependsOn: [webSearchJob.id],
  timeout: 60000 // 60 seconds for LLM
});

Complete Example

import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();
const queue = new Queue('agent');

// Worker
new Worker('agent', async (job) => {
  switch (job.name) {
    case 'plan':
      const plan = await openai.chat.completions.create({
        model: 'gpt-4',
        messages: [
          { role: 'system', content: 'Plan tools. Return JSON: {"tools":[{"name":"","params":{}}]}' },
          { role: 'user', content: job.data.question }
        ],
        response_format: { type: 'json_object' }
      });
      return JSON.parse(plan.choices[0].message.content);

    case 'tool':
      return await executeTool(job.data.name, job.data.params);

    case 'aggregate':
      const context = job.parentResults.map(r => JSON.stringify(r)).join('\n');
      const answer = await openai.chat.completions.create({
        model: 'gpt-4',
        messages: [
          { role: 'system', content: 'Answer using the tool results.' },
          { role: 'user', content: `Q: ${job.data.question}\n\nResults:\n${context}` }
        ]
      });
      return answer.choices[0].message.content;
  }
});

// Run agent
async function ask(question: string) {
  const planJob = await queue.add('plan', { question });
  const plan = await queue.finished(planJob.id);

  const toolJobs = await Promise.all(
    plan.tools.map(t => queue.add('tool', t))
  );

  const aggregateJob = await queue.add('aggregate', { question }, {
    dependsOn: toolJobs.map(j => j.id)
  });

  return queue.finished(aggregateJob.id);
}

// Usage
const answer = await ask("What's the weather in Paris and the current EUR/USD rate?");
console.log(answer);
Key Takeaways
  • Use fan-out/fan-in for parallel tool execution
  • Set dependsOn with multiple job IDs for aggregation
  • Access all parent results via job.parentResults
  • Handle partial failures gracefully

Build Your Agent

Start building AI agents with parallel tool execution.

See Docs →