Overview
These advanced patterns demonstrate production-ready implementations for real-world applications.Tool Runner Hooks
Intercept and modify agent behavior at different stages:examples/tool-runner-hooks/tool_runner_hooks.py
import asyncio
from fast_agent import FastAgent
from fast_agent.agents.agent_types import AgentConfig
from fast_agent.agents.tool_agent import ToolAgent
from fast_agent.agents.tool_runner import ToolRunnerHooks
from fast_agent.context import Context
from fast_agent.interfaces import ToolRunnerHookCapable
from fast_agent.types import PromptMessageExtended
def get_video_call_transcript(video_id: str) -> str:
return "Assistant: Hi, how can I assist you today?\n\nCustomer: Hi, I wanted to ask you about last invoice I received..."
class HookedToolAgent(ToolAgent, ToolRunnerHookCapable):
def __init__(
self,
config: AgentConfig,
context: Context | None = None,
):
tools = [get_video_call_transcript]
super().__init__(config, tools, context)
self._hooks = ToolRunnerHooks(
before_llm_call=self._add_style_hint,
after_tool_call=self._log_tool_result,
)
@property
def tool_runner_hooks(self) -> ToolRunnerHooks | None:
return self._hooks
async def _add_style_hint(self, runner, messages: list[PromptMessageExtended]) -> None:
if runner.iteration == 0:
runner.append_messages("Keep the answer to one short sentence.")
async def _log_tool_result(self, runner, message: PromptMessageExtended) -> None:
if message.tool_results:
tool_names = ", ".join(message.tool_results.keys())
print(f"[hook] tool results received: {tool_names}")
fast = FastAgent("Example Tool Use Application (Hooks)")
@fast.custom(HookedToolAgent)
async def main() -> None:
async with fast.run() as agent:
await agent.default.generate(
"What is the topic of the video call no.1234?",
)
await agent.interactive()
if __name__ == "__main__":
asyncio.run(main())
Available hooks:
before_llm_call, after_llm_call, before_tool_call, after_tool_call, on_errorHook Use Cases
Logging and Monitoring
Logging and Monitoring
Track tool usage and performance:
async def log_tool_usage(runner, message: PromptMessageExtended) -> None:
if message.tool_results:
for tool_name, result in message.tool_results.items():
logger.info(f"Tool: {tool_name}, Duration: {result.duration}ms")
metrics.increment(f"tool.{tool_name}.calls")
Response Filtering
Response Filtering
Sanitize or format responses:
async def sanitize_response(runner, messages: list[PromptMessageExtended]) -> None:
for msg in messages:
if msg.role == "assistant" and msg.content:
# Remove sensitive information
msg.content = sanitize(msg.content)
Cost Tracking
Cost Tracking
Monitor token usage:
async def track_tokens(runner, message: PromptMessageExtended) -> None:
if hasattr(message, 'usage'):
cost = calculate_cost(message.usage)
tracker.add_cost(cost)
logger.info(f"Request cost: ${cost:.4f}")
Retry Logic
Retry Logic
Implement custom retry behavior:
async def retry_on_error(runner, error: Exception) -> None:
if isinstance(error, RateLimitError):
await asyncio.sleep(1)
# Retry will happen automatically
else:
raise error
RAG Integration
Integrate Retrieval-Augmented Generation with Google Vertex AI:examples/rag/vertex-rag.py
import asyncio
import google.auth
import vertexai
from vertexai import rag
from fast_agent import FastAgent
from fast_agent.config import get_settings
CONFIG_PATH = "fastagent.secrets.yaml"
_settings = get_settings(CONFIG_PATH)
_vertex_ai = getattr(_settings.google, "vertex_ai", {}) if _settings.google else {}
PROJECT_ID = _vertex_ai.get("project_id")
LOCATION = _vertex_ai.get("location")
EMBEDDING_MODEL = "text-embedding-005"
def mini_rag(query: str, drive_id: str, top_k: int) -> object:
"""Query RAG corpus for relevant documents."""
vertexai.init(project=PROJECT_ID, location=LOCATION)
# Create or get existing corpus
paths = [f"https://drive.google.com/drive/folders/{drive_id}"]
existing_corpus = None
for corpus in rag.list_corpora():
if corpus.display_name and drive_id in corpus.display_name:
existing_corpus = corpus
break
if not existing_corpus:
# Create new corpus and import files
embedding_model_config = rag.RagEmbeddingModelConfig(
vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
publisher_model=f"publishers/google/models/{EMBEDDING_MODEL}"
)
)
rag_corpus = rag.create_corpus(
display_name=f"Corpus {drive_id}",
backend_config=rag.RagVectorDbConfig(
rag_embedding_model_config=embedding_model_config
),
)
rag.import_files(
rag_corpus.name,
paths,
transformation_config=rag.TransformationConfig(
chunking_config=rag.ChunkingConfig(
chunk_size=512,
chunk_overlap=100,
),
),
)
else:
rag_corpus = existing_corpus
# Query the corpus
rag_retrieval_config = rag.RagRetrievalConfig(
top_k=top_k,
filter=rag.Filter(vector_distance_threshold=0.5),
)
return rag.retrieval_query(
rag_resources=[rag.RagResource(rag_corpus=rag_corpus.name)],
text=query,
rag_retrieval_config=rag_retrieval_config,
)
fast = FastAgent("Google Vertex RAG")
@fast.agent(
name="vertex_rag",
function_tools=[mini_rag],
)
async def main():
async with fast.run() as agent:
result = await agent(
"Produce a short top 5 prioritized list about customer pain points. From RAG, select 50 relevant chunks."
)
print(result)
if __name__ == "__main__":
asyncio.run(main())
Configure Vertex AI credentials in
fastagent.secrets.yaml:google:
vertex_ai:
enabled: true
project_id: your-project-id
location: us-central1
FastAPI Integration
Simple Integration
Basic FastAPI integration with agent lifecycle management:examples/fastapi/fastapi-simple.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fast_agent import FastAgent
# Create FastAgent without parsing CLI args
fast = FastAgent("fast-agent demo", parse_cli_args=False, quiet=True)
@fast.agent(name="helper", instruction="You are a helpful AI Agent.", default=True)
async def decorator():
pass
# Keep FastAgent running for the app lifetime
@asynccontextmanager
async def lifespan(app: FastAPI):
async with fast.run() as agents:
app.state.agents = agents
yield
app = FastAPI(lifespan=lifespan)
class AskRequest(BaseModel):
message: str
class AskResponse(BaseModel):
response: str
@app.post("/ask", response_model=AskResponse)
async def ask(req: AskRequest) -> AskResponse:
try:
result = await app.state.agents.send(req.message)
return AskResponse(response=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
uvicorn fastapi-simple:app --reload
Advanced Integration
Manual lifecycle control with Core API:examples/fastapi/fastapi-advanced.py
from contextlib import asynccontextmanager
from fastapi import Body, FastAPI, HTTPException
from fast_agent import PromptMessageExtended
from fast_agent.agents import McpAgent
from fast_agent.agents.agent_types import AgentConfig
from fast_agent.core import Core
from fast_agent.core.direct_factory import get_model_factory
core = Core(name="fast-agent core")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Manual lifecycle control
await core.initialize()
cfg = AgentConfig(
name="core_agent",
instruction="You are a helpful AI Agent.",
)
agent = McpAgent(config=cfg, context=core.context)
await agent.initialize()
llm_factory = get_model_factory(core.context, model=cfg.model)
await agent.attach_llm(llm_factory)
app.state.agent = agent
try:
yield
finally:
try:
await agent.shutdown()
finally:
await core.cleanup()
app = FastAPI(lifespan=lifespan)
@app.post("/ask", response_model=PromptMessageExtended)
async def ask(body: str = Body(..., media_type="text/plain")) -> PromptMessageExtended:
try:
# Call generate() to return the full multipart message
result = await app.state.agent.generate(body)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
The advanced pattern gives you full control over agent initialization and cleanup, useful for custom configurations.
OpenTelemetry Integration
Monitor your agents with OpenTelemetry:fastagent.config.yaml
observability:
otel:
enabled: true
endpoint: "http://localhost:4318"
service_name: "fast-agent"
traces:
enabled: true
sample_rate: 1.0
metrics:
enabled: true
from fast_agent.observability import get_tracer, get_meter
tracer = get_tracer(__name__)
meter = get_meter(__name__)
# Create metrics
request_counter = meter.create_counter(
"agent.requests",
description="Number of agent requests"
)
@fast.agent(name="monitored_agent")
async def main():
async with fast.run() as agent:
with tracer.start_as_current_span("agent.process"):
request_counter.add(1)
result = await agent.send("Hello")
return result
Agent-to-Agent Communication
Connect agents across different processes or machines:examples/a2a/server.py
import asyncio
from fast_agent import FastAgent
fast = FastAgent("A2A Server")
@fast.agent(
name="worker",
instruction="You are a worker agent that processes tasks.",
servers=["filesystem"],
)
async def main():
# Start A2A server
await fast.start_a2a_server(host="0.0.0.0", port=8765)
if __name__ == "__main__":
asyncio.run(main())
examples/a2a/agent_executor.py
import asyncio
from fast_agent import FastAgent
fast = FastAgent("A2A Client")
@fast.agent(
name="coordinator",
instruction="You coordinate tasks with remote workers.",
a2a_connections=[
{"name": "worker", "url": "ws://localhost:8765"}
],
)
async def main():
async with fast.run() as agent:
# Call remote agent as a tool
result = await agent.coordinator.send(
"Process this file using the worker agent"
)
print(result)
if __name__ == "__main__":
asyncio.run(main())
A2A communication is experimental and subject to change. Use in production with caution.
Custom Model Providers
Integrate custom LLM providers:from fast_agent.llm.base import BaseLLM, LLMConfig
from fast_agent.types import PromptMessageExtended
class CustomLLM(BaseLLM):
def __init__(self, config: LLMConfig):
super().__init__(config)
# Initialize your custom provider
self.client = CustomClient(api_key=config.api_key)
async def generate(
self,
messages: list[PromptMessageExtended],
**kwargs
) -> PromptMessageExtended:
# Implement your generation logic
response = await self.client.complete(messages)
return self._format_response(response)
# Register the provider
from fast_agent.llm.model_factory import ModelFactory
ModelFactory.register_provider("custom", CustomLLM)
# Use in your agent
@fast.agent(model="custom:my-model")
async def main():
async with fast.run() as agent:
await agent.interactive()
Production Best Practices
Error Handling
Error Handling
Implement comprehensive error handling:
from fast_agent.exceptions import AgentError, ToolError, LLMError
try:
result = await agent.send(message)
except ToolError as e:
logger.error(f"Tool failed: {e.tool_name} - {e.message}")
# Handle tool failure
except LLMError as e:
logger.error(f"LLM error: {e}")
# Handle LLM failure
except AgentError as e:
logger.error(f"Agent error: {e}")
# Handle general agent error
Rate Limiting
Rate Limiting
Implement rate limiting for API calls:
from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 requests per second
async def rate_limited_send(agent, message):
async with rate_limiter:
return await agent.send(message)
Secrets Management
Secrets Management
Use environment variables and secret managers:Load from AWS Secrets Manager:
fastagent.secrets.yaml
llm:
anthropic:
api_key: "${ANTHROPIC_API_KEY}"
openai:
api_key: "${OPENAI_API_KEY}"
mcp:
servers:
brave-search:
env:
BRAVE_API_KEY: "${BRAVE_API_KEY}"
import boto3
import json
def load_secrets():
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='fast-agent/prod')
return json.loads(response['SecretString'])
Health Checks
Health Checks
Implement health check endpoints:
@app.get("/health")
async def health_check():
try:
# Check agent is responsive
await app.state.agent.send("ping")
return {"status": "healthy", "agent": "ready"}
except Exception as e:
raise HTTPException(status_code=503, detail=str(e))
@app.get("/ready")
async def readiness_check():
if not app.state.agent:
raise HTTPException(status_code=503, detail="Agent not initialized")
return {"status": "ready"}
Graceful Shutdown
Graceful Shutdown
Handle shutdown gracefully:
import signal
shutdown_event = asyncio.Event()
def signal_handler(sig, frame):
logger.info("Shutting down gracefully...")
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
async with fast.run() as agent:
try:
await shutdown_event.wait()
finally:
logger.info("Cleanup complete")
Performance Optimization
Streaming Responses
@app.post("/stream")
async def stream_response(req: AskRequest):
async def generate():
async for chunk in app.state.agent.stream(req.message):
yield f"data: {chunk}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Caching
from functools import lru_cache
import hashlib
class CachedAgent:
def __init__(self, agent):
self.agent = agent
self.cache = {}
async def send(self, message: str) -> str:
# Simple hash-based caching
cache_key = hashlib.md5(message.encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
result = await self.agent.send(message)
self.cache[cache_key] = result
return result
Connection Pooling
@fast.agent(
name="pooled_agent",
servers=["fetch"],
connection_persistence=True, # Reuse MCP connections
)
async def main():
async with fast.run() as agent:
# Connections are pooled and reused
await agent.interactive()
Next Steps
Configuration
Learn about all configuration options
Basic Examples
Back to basic usage examples
API Reference
Explore the complete API
GitHub
Contribute to Fast Agent on GitHub
