The Parallel workflow sends the same message to multiple agents simultaneously (fan-out), then aggregates their combined responses using a fan-in agent. This pattern is ideal for gathering diverse perspectives or processing multiple independent tasks concurrently.
Parallel execution leverages asyncio for true concurrent processing, significantly reducing total execution time.
import asynciofrom pathlib import Pathfrom fast_agent import FastAgentfrom fast_agent.core.prompt import Promptfast = FastAgent("Parallel Workflow")@fast.agent( name="proofreader", instruction="""Review the short story for grammar, spelling, and punctuation errors. Identify any awkward phrasing or structural issues that could improve clarity. Provide detailed feedback on corrections.""",)@fast.agent( name="fact_checker", instruction="""Verify the factual consistency within the story. Identify any contradictions, logical inconsistencies, or inaccuracies in the plot, character actions, or setting. Highlight potential issues with reasoning or coherence.""",)@fast.agent( name="style_enforcer", instruction="""Analyze the story for adherence to style guidelines. Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to enhance storytelling, readability, and engagement.""", model="sonnet",)@fast.agent( name="grader", instruction="""Compile the feedback from the Proofreader, Fact Checker, and Style Enforcer into a structured report. Summarize key issues and categorize them by type. Provide actionable recommendations for improving the story, and give an overall grade based on the feedback.""",)@fast.parallel( fan_out=["proofreader", "fact_checker", "style_enforcer"], fan_in="grader", name="parallel",)async def main() -> None: async with fast.run() as agent: await agent.parallel.send( Prompt.user("Student short story submission", Path("short_story.txt")) )if __name__ == "__main__": asyncio.run(main())
@fast.agent( "gpt_analyst", instruction="Analyze the business problem", model="gpt-4.1",)@fast.agent( "claude_analyst", instruction="Analyze the business problem", model="sonnet",)@fast.agent( "gemini_analyst", instruction="Analyze the business problem", model="gemini-2.0-flash-exp",)@fast.agent( "synthesizer", instruction="Synthesize insights from multiple AI perspectives",)@fast.parallel( name="ensemble_analysis", fan_out=["gpt_analyst", "claude_analyst", "gemini_analyst"], fan_in="synthesizer",)async def main() -> None: async with fast.run() as agent: await agent.ensemble_analysis.send( "Evaluate the market opportunity for AI-powered healthcare diagnostics" )
@fast.agent( "academic_researcher", instruction="Search academic papers and journals", servers=["fetch"],)@fast.agent( "news_researcher", instruction="Find recent news articles and reports", servers=["fetch"],)@fast.agent( "social_researcher", instruction="Analyze social media trends and discussions", servers=["fetch"],)@fast.agent( "research_synthesizer", instruction="Combine research from all sources into a comprehensive report",)@fast.parallel( name="comprehensive_research", fan_out=["academic_researcher", "news_researcher", "social_researcher"], fan_in="research_synthesizer",)async def main() -> None: async with fast.run() as agent: await agent.comprehensive_research.send("Research quantum computing applications")
@fast.agent( "security_reviewer", instruction="Review code for security vulnerabilities", servers=["filesystem"],)@fast.agent( "performance_reviewer", instruction="Review code for performance issues and optimizations", servers=["filesystem"],)@fast.agent( "style_reviewer", instruction="Review code for style, readability, and best practices", servers=["filesystem"],)@fast.agent( "review_aggregator", instruction="Compile all review feedback into prioritized action items",)@fast.parallel( name="code_review", fan_out=["security_reviewer", "performance_reviewer", "style_reviewer"], fan_in="review_aggregator",)async def main() -> None: async with fast.run() as agent: await agent.code_review.send("Review the authentication module")