Skip to main content

Overview

The Parallel workflow sends the same message to multiple agents simultaneously (fan-out), then aggregates their combined responses using a fan-in agent. This pattern is ideal for gathering diverse perspectives or processing multiple independent tasks concurrently.
Parallel execution leverages asyncio for true concurrent processing, significantly reducing total execution time.

Key Features

  • Concurrent Execution: Agents run simultaneously using asyncio
  • Fan-Out: Same input sent to multiple agents
  • Fan-In Aggregation: Combined results processed by aggregator agent
  • Flexible Composition: Works with any agent types
  • Model Diversity: Use different models for different perspectives

Basic Usage

import asyncio
from pathlib import Path
from fast_agent import FastAgent
from fast_agent.core.prompt import Prompt

fast = FastAgent("Parallel Workflow")

@fast.agent(
    name="proofreader",
    instruction="""Review the short story for grammar, spelling, and punctuation errors.
    Identify any awkward phrasing or structural issues that could improve clarity. 
    Provide detailed feedback on corrections.""",
)
@fast.agent(
    name="fact_checker",
    instruction="""Verify the factual consistency within the story. Identify any contradictions,
    logical inconsistencies, or inaccuracies in the plot, character actions, or setting. 
    Highlight potential issues with reasoning or coherence.""",
)
@fast.agent(
    name="style_enforcer",
    instruction="""Analyze the story for adherence to style guidelines.
    Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to 
    enhance storytelling, readability, and engagement.""",
    model="sonnet",
)
@fast.agent(
    name="grader",
    instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer
    into a structured report. Summarize key issues and categorize them by type. 
    Provide actionable recommendations for improving the story, 
    and give an overall grade based on the feedback.""",
)
@fast.parallel(
    fan_out=["proofreader", "fact_checker", "style_enforcer"],
    fan_in="grader",
    name="parallel",
)
async def main() -> None:
    async with fast.run() as agent:
        await agent.parallel.send(
            Prompt.user("Student short story submission", Path("short_story.txt"))
        )

if __name__ == "__main__":
    asyncio.run(main())

Configuration Parameters

name
string
required
Name of the parallel workflow
fan_out
list[str]
required
List of agent names to execute in parallel
fan_in
string
Agent name that aggregates results (optional)
instruction
string
Description of the parallel workflow for other workflows
include_request
bool
default:"true"
Include original request in fan-in aggregation

How It Works

  1. Fan-Out: The same message is sent to all agents simultaneously
  2. Concurrent Execution: Agents process independently in parallel
  3. Result Collection: All responses are gathered
  4. Fan-In: Aggregator agent processes combined results
  5. Final Response: Aggregated output is returned

Response Format

The fan-in agent receives formatted responses with XML tags:
<fastagent:request>
The original user request
</fastagent:request>

<fastagent:response agent="agent1">
First agent's response
</fastagent:response>

<fastagent:response agent="agent2">
Second agent's response
</fastagent:response>

<fastagent:response agent="agent3">
Third agent's response
</fastagent:response>

Without Fan-In Agent

If no fan-in agent is specified, the parallel workflow returns combined responses directly:
@fast.parallel(
    name="multi_translate",
    fan_out=["translate_fr", "translate_de", "translate_es"],
    # No fan_in specified
)
async def main() -> None:
    async with fast.run() as agent:
        # Returns combined translations as-is
        result = await agent.multi_translate.send("Hello world")

Advanced Examples

Multi-Model Ensemble

@fast.agent(
    "gpt_analyst",
    instruction="Analyze the business problem",
    model="gpt-4.1",
)
@fast.agent(
    "claude_analyst",
    instruction="Analyze the business problem",
    model="sonnet",
)
@fast.agent(
    "gemini_analyst",
    instruction="Analyze the business problem",
    model="gemini-2.0-flash-exp",
)
@fast.agent(
    "synthesizer",
    instruction="Synthesize insights from multiple AI perspectives",
)
@fast.parallel(
    name="ensemble_analysis",
    fan_out=["gpt_analyst", "claude_analyst", "gemini_analyst"],
    fan_in="synthesizer",
)
async def main() -> None:
    async with fast.run() as agent:
        await agent.ensemble_analysis.send(
            "Evaluate the market opportunity for AI-powered healthcare diagnostics"
        )

Parallel Research Tasks

@fast.agent(
    "academic_researcher",
    instruction="Search academic papers and journals",
    servers=["fetch"],
)
@fast.agent(
    "news_researcher",
    instruction="Find recent news articles and reports",
    servers=["fetch"],
)
@fast.agent(
    "social_researcher",
    instruction="Analyze social media trends and discussions",
    servers=["fetch"],
)
@fast.agent(
    "research_synthesizer",
    instruction="Combine research from all sources into a comprehensive report",
)
@fast.parallel(
    name="comprehensive_research",
    fan_out=["academic_researcher", "news_researcher", "social_researcher"],
    fan_in="research_synthesizer",
)
async def main() -> None:
    async with fast.run() as agent:
        await agent.comprehensive_research.send("Research quantum computing applications")

Code Review from Multiple Perspectives

@fast.agent(
    "security_reviewer",
    instruction="Review code for security vulnerabilities",
    servers=["filesystem"],
)
@fast.agent(
    "performance_reviewer",
    instruction="Review code for performance issues and optimizations",
    servers=["filesystem"],
)
@fast.agent(
    "style_reviewer",
    instruction="Review code for style, readability, and best practices",
    servers=["filesystem"],
)
@fast.agent(
    "review_aggregator",
    instruction="Compile all review feedback into prioritized action items",
)
@fast.parallel(
    name="code_review",
    fan_out=["security_reviewer", "performance_reviewer", "style_reviewer"],
    fan_in="review_aggregator",
)
async def main() -> None:
    async with fast.run() as agent:
        await agent.code_review.send("Review the authentication module")

Performance Benefits

Execution Time Comparison

Sequential: 3 agents × 10 seconds each = 30 seconds total
Parallel: max(10, 10, 10) seconds = 10 seconds total
Speedup: 3× faster

Best Practices

Independent Tasks

Ensure fan-out agents can work independently without dependencies

Clear Aggregation

Give the fan-in agent clear instructions on how to combine results

Model Selection

Use different models for diverse perspectives or cost optimization

Error Handling

Consider partial failures - one agent failing shouldn’t break all

Use Cases

  • Multi-Perspective Analysis: Get opinions from different specialized agents
  • Quality Assurance: Multiple reviewers checking different aspects
  • Research Aggregation: Gather information from multiple sources simultaneously
  • Model Ensembling: Combine outputs from different AI models
  • Translation: Translate to multiple languages at once
  • Testing: Run multiple test scenarios concurrently

Combining with Other Workflows

Parallel in a Chain

@fast.agent("data_collector", instruction="Collect and prepare data")
@fast.parallel(
    name="parallel_analysis",
    fan_out=["analyst1", "analyst2", "analyst3"],
    fan_in="synthesizer",
)
@fast.agent("report_writer", instruction="Format findings into a report")

@fast.chain(
    name="full_pipeline",
    sequence=["data_collector", "parallel_analysis", "report_writer"],
)
async def main() -> None:
    async with fast.run() as agent:
        await agent.full_pipeline.send("Analyze Q4 performance")
  • Chain - Sequential execution instead of parallel
  • Agents as Tools - LLM-driven parallel execution with routing
  • Orchestrator - Dynamic parallel task decomposition