Agentic Systems - Complete Code Reference
This reference contains full implementations of agentic system patterns from Chapter 8.
Tool and Function Definitions
Agentic Tool Execution Loop
Complete implementation of the tool execution loop that handles multiple turns of tool calls.
def agentic_loop(user_query: str, tools: list, max_turns: int = 10) -> str:
"""Execute tool calls until the model produces a final response."""
messages = [{"role": "user", "content": user_query}]
for turn in range(max_turns):
response = client.messages.create(
model="claude-sonnet-4-20250514",
messages=messages,
tools=tools
)
# Add assistant response to history
messages.append({"role": "assistant", "content": response.content})
# Check if we're done
if response.stop_reason == "end_turn":
# Extract text response
for block in response.content:
if hasattr(block, "text"):
return block.text
return ""
# Execute tool calls
tool_results = []
for block in response.content:
if block.type == "tool_use":
try:
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result) if not isinstance(result, str) else result
})
except Exception as e:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": f"Error: {str(e)}",
"is_error": True
})
messages.append({"role": "user", "content": tool_results})
return "Max turns exceeded without final response"Tool Description Best Practices
Example of a well-documented tool schema with clear descriptions and constraints.
# Bad: Vague description {.unnumbered}
{
"name": "search",
"description": "Search for stuff"
}
# Good: Clear, specific description {.unnumbered}
{
"name": "search_knowledge_base",
"description": """Search the company's internal knowledge base for documents.
Use this tool when the user asks questions about:
- Company policies and procedures
- Product documentation
- Internal processes
Returns top 5 matching documents with relevance scores.
Does NOT search: email, Slack, or external websites.""",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language search query. Be specific—include product names, policy types, or document titles if known."
},
"department": {
"type": "string",
"enum": ["engineering", "sales", "hr", "finance", "all"],
"description": "Limit search to a specific department, or 'all' for company-wide search"
}
},
"required": ["query"]
}
}MCP Server Patterns
Complete MCP Server Implementation
Full MCP server with document search capabilities, resources, and proper async handling.
"""MCP server exposing document search capabilities."""
import asyncio
import json
from mcp import Server, Tool
from mcp.types import TextContent
# Initialize server {.unnumbered}
server = Server("document-search")
# Define a tool {.unnumbered}
@server.tool("search_documents")
async def search_documents(query: str, limit: int = 5) -> list[dict]:
"""
Search internal documents.
Args:
query: Natural language search query
limit: Maximum results to return (default 5, max 20)
Returns:
List of matching documents with snippets and scores
"""
# Connect to your actual search backend
results = await vector_store.search(query, top_k=min(limit, 20))
return [
{
"id": r.id,
"title": r.metadata.get("title", "Untitled"),
"snippet": r.text[:500],
"score": r.score
}
for r in results
]
@server.tool("get_document")
async def get_document(doc_id: str) -> dict:
"""
Retrieve full document by ID.
Args:
doc_id: Document ID from search results
Returns:
Full document content and metadata
"""
doc = await document_store.get(doc_id)
if not doc:
return {"error": f"Document {doc_id} not found"}
return {
"id": doc_id,
"title": doc.title,
"content": doc.content,
"updated_at": doc.updated_at.isoformat()
}
# Define a resource {.unnumbered}
@server.resource("corpus://stats")
async def corpus_stats():
"""Corpus statistics, refreshed hourly."""
stats = await get_corpus_stats()
return {
"total_documents": stats.doc_count,
"last_indexed": stats.last_index_time.isoformat()
}
# Run server {.unnumbered}
async def main():
async with server.run_stdio() as running:
await running.wait_closed()
if __name__ == "__main__":
asyncio.run(main())MCP SQL Security Pattern
Secure SQL query tool with read-only validation and timeout protection.
@server.tool("execute_sql")
async def execute_sql(query: str) -> dict:
"""
Execute a read-only SQL query.
Security:
- Only SELECT statements allowed
- Query timeout: 30 seconds
- Results limited to 1000 rows
"""
# Validate query is read-only
if not query.strip().upper().startswith("SELECT"):
return {"error": "Only SELECT queries are allowed"}
# Check for dangerous patterns
dangerous_patterns = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER"]
for pattern in dangerous_patterns:
if pattern in query.upper():
return {"error": f"Query contains forbidden keyword: {pattern}"}
# Execute with timeout
try:
async with asyncio.timeout(30):
results = await db.execute(query)
return {"rows": results[:1000], "truncated": len(results) > 1000}
except asyncio.TimeoutError:
return {"error": "Query timed out"}Agent Architectures
ReAct Agent
Complete ReAct (Reasoning + Acting) agent implementation with thought/action parsing.
class ReActAgent:
"""Agent using ReAct pattern: Reasoning + Acting."""
SYSTEM_PROMPT = """You are a helpful assistant that solves problems step by step.
For each step, you must use this exact format:
Thought: [your reasoning about what to do next]
Action: [tool_name]([arguments])
After receiving an observation, continue with another Thought/Action or provide the final answer:
Answer: [your final response to the user]
Available tools:
{tool_descriptions}
Always think before acting. Never skip the Thought step."""
def __init__(self, llm_client, tools: dict[str, callable]):
self.llm = llm_client
self.tools = tools
def run(self, query: str, max_steps: int = 10) -> str:
tool_desc = self._format_tool_descriptions()
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT.format(tool_descriptions=tool_desc)},
{"role": "user", "content": query}
]
for step in range(max_steps):
response = self.llm.generate(messages)
messages.append({"role": "assistant", "content": response})
# Check for final answer
if "Answer:" in response:
return response.split("Answer:")[-1].strip()
# Extract and execute action
action = self._parse_action(response)
if action:
tool_name, args = action
if tool_name in self.tools:
try:
result = self.tools[tool_name](*args)
observation = f"Observation: {result}"
except Exception as e:
observation = f"Observation: Error - {str(e)}"
else:
observation = f"Observation: Unknown tool '{tool_name}'"
messages.append({"role": "user", "content": observation})
else:
messages.append({"role": "user", "content": "Observation: Could not parse action. Please use the format: Action: tool_name(args)"})
return "Max steps reached without finding an answer."
def _parse_action(self, text: str) -> tuple[str, list] | None:
"""Extract action from response."""
import re
match = re.search(r'Action:\s*(\w+)\((.*?)\)', text)
if match:
tool_name = match.group(1)
args_str = match.group(2)
# Simple argument parsing (production code needs proper handling)
args = [a.strip().strip('"\'') for a in args_str.split(',')] if args_str else []
return tool_name, args
return NonePlanning Agent
Agent that creates explicit plans before execution with parallel step support.
class PlanningAgent:
"""Agent that creates a plan before executing."""
PLANNING_PROMPT = """Create a step-by-step plan to accomplish this task.
Task: {task}
Available tools:
{tools}
Output your plan as a numbered list. Each step should be concrete and actionable.
Mark steps that can be done in parallel with [PARALLEL].
Plan:"""
EXECUTION_PROMPT = """Execute this step of the plan.
Overall task: {task}
Current step: {step}
Previous results: {context}
Use the appropriate tool and return the result."""
def __init__(self, llm_client, tools: dict):
self.llm = llm_client
self.tools = tools
async def run(self, task: str) -> str:
# Phase 1: Create plan
plan = await self._create_plan(task)
print(f"Plan:\n{plan}")
# Phase 2: Execute plan
results = {}
for i, step in enumerate(plan):
if step.get("parallel_with"):
# Execute parallel steps together
parallel_steps = [plan[j] for j in step["parallel_with"]]
parallel_results = await asyncio.gather(*[
self._execute_step(s, task, results)
for s in [step] + parallel_steps
])
for s, r in zip([step] + parallel_steps, parallel_results):
results[s["id"]] = r
else:
results[step["id"]] = await self._execute_step(step, task, results)
# Phase 3: Synthesize final answer
return await self._synthesize(task, results)
async def _create_plan(self, task: str) -> list[dict]:
response = await self.llm.generate(
self.PLANNING_PROMPT.format(task=task, tools=self._tool_descriptions())
)
return self._parse_plan(response)
async def _execute_step(self, step: dict, task: str, context: dict) -> str:
prompt = self.EXECUTION_PROMPT.format(
task=task,
step=step["description"],
context=json.dumps(context, indent=2)
)
return await self.llm.generate(prompt, tools=self.tools)Supervisor-Worker Multi-Agent
Supervisor that coordinates specialized worker agents with dependency management.
class SupervisorAgent:
"""Coordinates specialist worker agents."""
def __init__(self, llm_client, workers: dict[str, 'WorkerAgent']):
self.llm = llm_client
self.workers = workers
async def execute(self, request: str) -> str:
# Create subtask breakdown
subtasks = await self._decompose(request)
# Execute subtasks respecting dependencies
completed = {}
while not all(t["status"] == "done" for t in subtasks):
# Find ready tasks (dependencies satisfied)
ready = [
t for t in subtasks
if t["status"] == "pending"
and all(d in completed for d in t.get("depends_on", []))
]
if not ready:
# Check for stuck state
pending = [t for t in subtasks if t["status"] == "pending"]
if pending:
raise AgentError("Execution stuck: dependencies cannot be resolved")
break
# Execute ready tasks in parallel
results = await asyncio.gather(*[
self._execute_subtask(t) for t in ready
])
for task, result in zip(ready, results):
task["status"] = "done"
task["result"] = result
completed[task["id"]] = result
# Synthesize final response
return await self._synthesize(request, subtasks)
async def _execute_subtask(self, task: dict) -> str:
worker_type = task["worker"]
if worker_type not in self.workers:
return f"Error: Unknown worker type {worker_type}"
worker = self.workers[worker_type]
return await worker.execute(task["description"])
async def _decompose(self, request: str) -> list[dict]:
prompt = f"""Break down this request into subtasks for specialist workers.
Available workers:
- RESEARCHER: Searches information, reads documents
- CODER: Writes and analyzes code
- ANALYST: Performs data analysis
- WRITER: Drafts documents and communications
Request: {request}
Output JSON array of subtasks with: id, description, worker, depends_on (list of task ids)"""
response = await self.llm.generate(prompt)
return json.loads(response)
class WorkerAgent:
"""Specialist worker agent."""
def __init__(self, llm_client, tools: list, system_prompt: str):
self.llm = llm_client
self.tools = tools
self.system_prompt = system_prompt
async def execute(self, task: str) -> str:
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task}
]
# Run agentic loop with tools
return await agentic_loop(messages, self.tools)Hierarchical Task Network Agent
Agent that recursively decomposes tasks into hierarchical subtasks.
class HierarchicalAgent:
"""Agent using hierarchical task networks."""
def __init__(self, llm_client, tools: dict):
self.llm = llm_client
self.tools = tools
async def run(self, task: str) -> str:
# Build task hierarchy
task_tree = await self._build_task_tree(task)
# Execute from leaves up
results = await self._execute_tree(task_tree)
# Synthesize final result
return await self._synthesize(task, results)
async def _build_task_tree(self, task: str, depth: int = 0) -> dict:
"""Recursively decompose task into subtasks."""
if depth > 3: # Max depth
return {"task": task, "subtasks": [], "is_leaf": True}
prompt = f"""Is this task simple enough to execute directly with available tools?
Task: {task}
Available tools: {list(self.tools.keys())}
If YES (can execute directly), respond: LEAF
If NO (needs decomposition), provide 2-4 subtasks as JSON list."""
response = await self.llm.generate(prompt)
if "LEAF" in response.upper():
return {"task": task, "subtasks": [], "is_leaf": True}
# Parse subtasks and recurse
subtask_list = json.loads(response)
subtasks = []
for subtask in subtask_list:
subtree = await self._build_task_tree(subtask, depth + 1)
subtasks.append(subtree)
return {"task": task, "subtasks": subtasks, "is_leaf": False}
async def _execute_tree(self, node: dict) -> dict:
"""Execute task tree from leaves up."""
if node["is_leaf"]:
# Execute leaf task
result = await self._execute_leaf(node["task"])
return {"task": node["task"], "result": result}
# Execute subtasks first
subtask_results = []
for subtask in node["subtasks"]:
result = await self._execute_tree(subtask)
subtask_results.append(result)
# Combine subtask results
combined = await self._combine_results(node["task"], subtask_results)
return {"task": node["task"], "result": combined, "subtasks": subtask_results}Self-Reflecting Agent
Agent that evaluates and improves its own responses.
class SelfReflectingAgent:
"""Agent that reflects on and improves its responses."""
REFLECTION_PROMPT = """Review your response to this query:
Query: {query}
Your response: {response}
Evaluate:
1. Does it fully answer the query?
2. Is the information accurate (based on your tools/sources)?
3. Is anything missing or unclear?
4. Could the response be improved?
If improvements are needed, provide an improved response.
If the response is good, respond with "APPROVED: [brief explanation]"."""
def __init__(self, llm_client, tools: dict):
self.llm = llm_client
self.tools = tools
async def run(self, query: str) -> str:
# Generate initial response
initial_response = await self._generate(query)
# Reflect and potentially improve
for iteration in range(3): # Max 3 reflection iterations
reflection = await self._reflect(query, initial_response)
if reflection.startswith("APPROVED"):
return initial_response
# Use reflection as improved response
initial_response = reflection
return initial_response
async def _reflect(self, query: str, response: str) -> str:
return await self.llm.generate(
self.REFLECTION_PROMPT.format(query=query, response=response)
)Critic-Actor Agent
Agent with separate generation and evaluation roles for quality control.
class CriticActorAgent:
"""Agent with separate actor and critic roles."""
def __init__(self, actor_llm, critic_llm, tools: dict):
self.actor = actor_llm
self.critic = critic_llm
self.tools = tools
async def run(self, query: str) -> str:
# Actor generates response
response = await self._actor_generate(query)
# Critic evaluates
critique = await self._critic_evaluate(query, response)
if critique["approved"]:
return response
# Actor revises based on critique
for revision in range(3):
response = await self._actor_revise(query, response, critique["feedback"])
critique = await self._critic_evaluate(query, response)
if critique["approved"]:
return response
return response # Return best effort
async def _critic_evaluate(self, query: str, response: str) -> dict:
prompt = f"""Evaluate this response:
Query: {query}
Response: {response}
Rate from 1-5 on:
- Accuracy: Is the information correct?
- Completeness: Does it fully address the query?
- Clarity: Is it easy to understand?
If all ratings are 4+, respond: APPROVED
Otherwise, provide specific feedback for improvement.
Output JSON: {{"approved": bool, "ratings": {{}}, "feedback": str}}"""
result = await self.critic.generate(prompt)
return json.loads(result)Memory-Augmented Agent
Agent with persistent memory and episodic consolidation.
class MemoryAugmentedAgent:
"""Agent with persistent memory across sessions."""
def __init__(self, llm_client, tools: dict, memory_store):
self.llm = llm_client
self.tools = tools
self.memory = memory_store # Vector store for memories
async def run(self, query: str, user_id: str) -> str:
# Retrieve relevant memories
memories = await self._recall(query, user_id)
# Include memories in context
context = self._format_memories(memories)
# Generate response with memory context
response = await self._generate_with_memory(query, context)
# Store new memory
await self._remember(query, response, user_id)
return response
async def _recall(self, query: str, user_id: str, k: int = 5) -> list[dict]:
"""Retrieve relevant memories."""
# Search for similar past interactions
results = await self.memory.search(
query=query,
filter={"user_id": user_id},
k=k
)
return results
async def _remember(self, query: str, response: str, user_id: str):
"""Store interaction as memory."""
memory = {
"query": query,
"response": response,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"embedding": await self._embed(f"{query}\n{response}")
}
await self.memory.add(memory)
def _format_memories(self, memories: list[dict]) -> str:
if not memories:
return ""
formatted = ["Relevant past interactions:"]
for m in memories:
formatted.append(f"- Q: {m['query'][:100]}...")
formatted.append(f" A: {m['response'][:200]}...")
return "\n".join(formatted)
class EpisodicMemory:
"""Episodic memory with forgetting and consolidation."""
def __init__(self, vector_store, max_memories: int = 1000):
self.store = vector_store
self.max_memories = max_memories
async def add(self, memory: dict):
"""Add memory with automatic consolidation."""
await self.store.add(memory)
# Check if consolidation needed
count = await self.store.count()
if count > self.max_memories:
await self._consolidate()
async def _consolidate(self):
"""Consolidate old memories to maintain limit."""
# Get oldest memories
old_memories = await self.store.get_oldest(n=100)
# Summarize similar old memories
clusters = self._cluster_memories(old_memories)
for cluster in clusters:
if len(cluster) > 3:
summary = await self._summarize_cluster(cluster)
# Replace cluster with summary
await self.store.delete(ids=[m["id"] for m in cluster])
await self.store.add(summary)Computer Use Agent
Agent that interacts with graphical user interfaces via screenshots and actions.
from dataclasses import dataclass
from typing import Literal
import base64
@dataclass
class ScreenAction:
type: Literal["click", "type", "scroll", "screenshot", "wait"]
x: int = None
y: int = None
text: str = None
direction: str = None
class ComputerUseAgent:
"""Agent that can interact with computer GUI."""
def __init__(self, vlm_client, screen_controller):
self.vlm = vlm_client
self.screen = screen_controller
async def execute_task(self, task: str, max_steps: int = 20) -> dict:
"""Execute a task using computer interface."""
history = []
for step in range(max_steps):
# Take screenshot of current state
screenshot = await self.screen.capture()
screenshot_b64 = base64.b64encode(screenshot).decode()
# Ask VLM what action to take
action = await self._decide_action(task, screenshot_b64, history)
if action['type'] == 'done':
return {
'success': True,
'steps': len(history),
'history': history
}
# Execute the action
result = await self._execute_action(action)
history.append({
'step': step,
'action': action,
'result': result
})
# Brief pause to let UI update
await asyncio.sleep(0.5)
return {
'success': False,
'reason': 'max_steps_exceeded',
'steps': len(history),
'history': history
}
async def _decide_action(
self,
task: str,
screenshot_b64: str,
history: list
) -> dict:
"""Use VLM to decide next action."""
history_summary = self._summarize_history(history[-5:])
response = await self.vlm.generate(
messages=[{
"role": "user",
"content": [
{
"type": "text",
"text": f"""You are controlling a computer to complete this task: {task}
Previous actions: {history_summary}
Look at the current screen and decide the next action.
Available actions:
- click(x, y): Click at coordinates
- type(text): Type text
- scroll(direction): Scroll up/down
- done: Task is complete
Respond with JSON: {{"type": "click|type|scroll|done", "x": num, "y": num, "text": "...", "direction": "up|down"}}"""
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64
}
}
]
}],
max_tokens=200
)
return json.loads(response)
async def _execute_action(self, action: dict) -> dict:
"""Execute an action on the screen."""
if action['type'] == 'click':
await self.screen.click(action['x'], action['y'])
return {'executed': 'click', 'position': (action['x'], action['y'])}
elif action['type'] == 'type':
await self.screen.type_text(action['text'])
return {'executed': 'type', 'text': action['text']}
elif action['type'] == 'scroll':
await self.screen.scroll(action['direction'])
return {'executed': 'scroll', 'direction': action['direction']}
return {'executed': action['type']}Safety and Sandboxing
Scoped Tool Set
Tool access control based on user permissions.
class ScopedToolSet:
"""Tools with explicit capability scoping."""
def __init__(self, user_permissions: set[str]):
self.permissions = user_permissions
def get_allowed_tools(self) -> list[dict]:
tools = []
# Read-only tools available to all
tools.append({
"name": "search",
"description": "Search documents"
})
# Write tools require explicit permission
if "documents:write" in self.permissions:
tools.append({
"name": "create_document",
"description": "Create a new document"
})
# Dangerous tools require elevated permission
if "admin:execute" in self.permissions:
tools.append({
"name": "execute_sql",
"description": "Execute SQL query"
})
return toolsSafe Agent with Confirmation
Agent that requires human approval for sensitive actions.
class ConfirmationRequired(Exception):
"""Raised when an action needs human confirmation."""
def __init__(self, action: str, details: dict):
self.action = action
self.details = details
class SafeAgent:
"""Agent that requests confirmation for sensitive actions."""
SENSITIVE_ACTIONS = {
"send_email": lambda args: len(args.get("recipients", [])) > 5,
"delete_file": lambda args: True, # Always confirm deletes
"execute_code": lambda args: "import os" in args.get("code", ""),
"transfer_money": lambda args: args.get("amount", 0) > 100,
}
async def execute_tool(self, name: str, args: dict) -> str:
# Check if confirmation is needed
if name in self.SENSITIVE_ACTIONS:
needs_confirmation = self.SENSITIVE_ACTIONS[name](args)
if needs_confirmation:
raise ConfirmationRequired(
action=name,
details={
"tool": name,
"arguments": args,
"reason": f"Action '{name}' requires human approval"
}
)
# Execute if no confirmation needed
return await self.tools[name](**args)
# Usage in application {.unnumbered}
async def run_agent_with_confirmation(agent, query):
while True:
try:
return await agent.run(query)
except ConfirmationRequired as e:
# Show confirmation dialog to user
approved = await show_confirmation_dialog(e.action, e.details)
if approved:
# Execute with explicit approval
agent.approved_actions.add(e.action)
else:
return "Action cancelled by user."Sandboxed Code Executor
Docker-based code execution sandbox with resource limits.
import docker
import tempfile
class SandboxedCodeExecutor:
"""Execute code in isolated Docker container."""
def __init__(
self,
timeout: int = 30,
memory_limit: str = "256m",
network_disabled: bool = True
):
self.client = docker.from_env()
self.timeout = timeout
self.memory_limit = memory_limit
self.network_disabled = network_disabled
def execute(self, code: str, language: str = "python") -> dict:
"""Execute code in sandbox and return result."""
# Write code to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
code_path = f.name
try:
# Run in container
container = self.client.containers.run(
image="python:3.11-slim",
command=f"python /code/script.py",
volumes={code_path: {"bind": "/code/script.py", "mode": "ro"}},
mem_limit=self.memory_limit,
network_disabled=self.network_disabled,
remove=True,
detach=True
)
# Wait with timeout
try:
result = container.wait(timeout=self.timeout)
logs = container.logs().decode()
return {
"success": result["StatusCode"] == 0,
"output": logs,
"exit_code": result["StatusCode"]
}
except Exception:
container.kill()
return {
"success": False,
"output": "Execution timed out",
"exit_code": -1
}
finally:
os.unlink(code_path)Rate-Limited Agent
Agent with execution limits to prevent runaway behavior.
class RateLimitedAgent:
"""Agent with rate limiting and cost controls."""
def __init__(
self,
llm_client,
tools: dict,
max_turns: int = 20,
max_tool_calls: int = 50,
max_cost: float = 1.0
):
self.llm = llm_client
self.tools = tools
self.max_turns = max_turns
self.max_tool_calls = max_tool_calls
self.max_cost = max_cost
self.turn_count = 0
self.tool_call_count = 0
self.total_cost = 0.0
async def run(self, query: str) -> str:
while self.turn_count < self.max_turns:
# Check limits
if self.tool_call_count >= self.max_tool_calls:
return "Tool call limit reached. Please simplify your request."
if self.total_cost >= self.max_cost:
return "Cost limit reached. Please try a more focused request."
# Execute turn
response = await self._execute_turn(query)
# Track metrics
self.turn_count += 1
self.total_cost += response.usage.cost
if response.is_final:
return response.content
return "Turn limit reached without completing task."Loop Detector
Detect and recover from repetitive agent patterns.
class LoopDetector:
"""Detect when an agent is stuck in a loop."""
def __init__(
self,
similarity_threshold: float = 0.85,
window_size: int = 5,
max_similar: int = 3
):
self.similarity_threshold = similarity_threshold
self.window_size = window_size
self.max_similar = max_similar
self.state_history = []
def check(self, state: str) -> bool:
"""
Check if we're in a loop.
Returns True if loop detected.
"""
if len(self.state_history) < 2:
self.state_history.append(state)
return False
# Check recent states for similarity
recent = self.state_history[-self.window_size:]
similar_count = sum(
1 for past_state in recent
if self._similarity(state, past_state) > self.similarity_threshold
)
self.state_history.append(state)
if similar_count >= self.max_similar:
return True
return False
def _similarity(self, s1: str, s2: str) -> float:
"""Compute similarity between two states."""
# Simple: exact match or Jaccard similarity
words1 = set(s1.lower().split())
words2 = set(s2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union
def get_loop_summary(self) -> str:
"""Summarize the detected loop for debugging."""
if len(self.state_history) < 2:
return "No loop detected"
recent = self.state_history[-5:]
return f"Recent states (last 5):\n" + "\n---\n".join(
s[:200] + "..." if len(s) > 200 else s
for s in recent
)
class AgentWithLoopDetection:
"""Agent that detects and recovers from loops."""
def __init__(self, llm_client, tools: dict):
self.llm = llm_client
self.tools = tools
self.loop_detector = LoopDetector()
async def run(self, query: str, max_turns: int = 15) -> str:
messages = [{"role": "user", "content": query}]
for turn in range(max_turns):
response = await self.llm.generate(messages)
# Check for loop
state = self._extract_state(response)
if self.loop_detector.check(state):
# Try to break the loop
recovery = await self._attempt_recovery(messages, response)
if recovery:
return recovery
else:
return f"Unable to complete: detected repetitive pattern. Last response: {response.content[:500]}"
messages.append({"role": "assistant", "content": response.content})
if response.is_final:
return response.content
# Execute tools and continue
tool_results = await self._execute_tools(response)
messages.extend(tool_results)
return "Max turns exceeded"
async def _attempt_recovery(self, messages: list, last_response) -> str | None:
"""Try to recover from a loop by prompting differently."""
recovery_prompt = """You seem to be repeating similar actions without making progress.
Please:
1. Summarize what you've learned so far
2. Identify why you might be stuck
3. Either provide a final answer with what you know, or explain what's missing
If you have enough information to partially answer, please do so."""
messages.append({"role": "user", "content": recovery_prompt})
response = await self.llm.generate(messages)
if "final answer" in response.content.lower() or response.is_final:
return response.content
return NoneOutput Validator
Validate agent outputs for quality and safety.
class OutputValidator:
"""Validate agent outputs meet quality standards."""
def __init__(self, validators: list[callable]):
self.validators = validators
def validate(self, query: str, response: str) -> tuple[bool, list[str]]:
"""
Validate response quality.
Returns (is_valid, list of issues).
"""
issues = []
for validator in self.validators:
try:
result = validator(query, response)
if not result["valid"]:
issues.append(result["issue"])
except Exception as e:
issues.append(f"Validation error: {str(e)}")
return len(issues) == 0, issues
# Example validators {.unnumbered}
def check_not_empty(query: str, response: str) -> dict:
"""Ensure response is not empty or trivial."""
if not response or len(response.strip()) < 10:
return {"valid": False, "issue": "Response is empty or too short"}
return {"valid": True}
def check_no_refusal(query: str, response: str) -> dict:
"""Ensure agent didn't refuse without good reason."""
refusal_patterns = [
"i cannot", "i'm unable to", "i don't have access",
"i can't help with"
]
lower_response = response.lower()
for pattern in refusal_patterns:
if pattern in lower_response:
return {
"valid": False,
"issue": f"Response appears to be a refusal: found '{pattern}'"
}
return {"valid": True}
def check_coherence(query: str, response: str) -> dict:
"""Basic coherence check - response should relate to query."""
# Simple heuristic: check for keyword overlap
query_words = set(query.lower().split())
response_words = set(response.lower().split())
# Remove common words
stopwords = {"the", "a", "an", "is", "are", "was", "were", "be", "been",
"being", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "must", "shall"}
query_words -= stopwords
response_words -= stopwords
overlap = len(query_words & response_words)
if overlap == 0 and len(query_words) > 2:
return {
"valid": False,
"issue": "Response doesn't seem related to query"
}
return {"valid": True}
class ValidatedAgent:
"""Agent with output validation."""
def __init__(self, base_agent, validators: list[callable]):
self.agent = base_agent
self.validator = OutputValidator(validators)
async def run(self, query: str, max_retries: int = 2) -> str:
for attempt in range(max_retries + 1):
response = await self.agent.run(query)
is_valid, issues = self.validator.validate(query, response)
if is_valid:
return response
if attempt < max_retries:
# Try to fix issues
fix_prompt = f"""Your previous response had these issues:
{chr(10).join('- ' + issue for issue in issues)}
Original query: {query}
Please try again, addressing these issues."""
query = fix_prompt
# Return last response even if invalid, with warning
return f"[Warning: Response may have quality issues: {', '.join(issues)}]\n\n{response}"Production Patterns
Agent Evaluation Framework
Comprehensive evaluation for agent benchmarks.
class AgentEvaluator:
"""Evaluate agent performance on task benchmarks."""
def __init__(self, agent, benchmark_tasks: list[dict]):
self.agent = agent
self.tasks = benchmark_tasks
async def run_evaluation(self) -> dict:
"""Run agent on all benchmark tasks."""
results = []
for task in self.tasks:
result = await self._evaluate_task(task)
results.append(result)
return self._aggregate_results(results)
async def _evaluate_task(self, task: dict) -> dict:
"""Evaluate agent on a single task."""
start_time = time.time()
total_tokens = 0
try:
# Run agent
agent_result = await self.agent.execute(
task['instruction'],
environment=task.get('environment')
)
elapsed = time.time() - start_time
# Check success using task-specific verifier
success = await self._verify_completion(
task,
agent_result
)
return {
'task_id': task['id'],
'success': success,
'steps': agent_result.get('steps', 0),
'elapsed_seconds': elapsed,
'tokens_used': agent_result.get('tokens', 0),
'error': None
}
except Exception as e:
return {
'task_id': task['id'],
'success': False,
'error': str(e),
'elapsed_seconds': time.time() - start_time
}
async def _verify_completion(
self,
task: dict,
agent_result: dict
) -> bool:
"""Verify task completion using task-specific criteria."""
verifier_type = task.get('verifier', 'exact_match')
if verifier_type == 'exact_match':
return agent_result.get('output') == task['expected_output']
elif verifier_type == 'contains':
return task['expected_substring'] in agent_result.get('output', '')
elif verifier_type == 'code_execution':
# Run test cases
return await self._run_tests(
agent_result.get('code'),
task['test_cases']
)
elif verifier_type == 'llm_judge':
# Use LLM to judge completion
return await self._llm_judge(
task['instruction'],
agent_result,
task['rubric']
)
return False
def _aggregate_results(self, results: list[dict]) -> dict:
"""Aggregate individual results into summary metrics."""
successful = [r for r in results if r['success']]
return {
'total_tasks': len(results),
'successful': len(successful),
'success_rate': len(successful) / len(results) if results else 0,
'avg_steps': sum(r.get('steps', 0) for r in successful) / len(successful) if successful else 0,
'avg_time': sum(r['elapsed_seconds'] for r in successful) / len(successful) if successful else 0,
'errors': [r for r in results if r.get('error')],
'detailed_results': results
}Resilient Agent with Checkpoints
Agent with failure recovery and state checkpointing.
class ResilientAgent:
"""Agent with failure recovery."""
def __init__(self, llm_client, tools: dict, max_retries: int = 3):
self.llm = llm_client
self.tools = tools
self.max_retries = max_retries
self.checkpoints = []
async def run(self, query: str) -> str:
for attempt in range(self.max_retries):
try:
return await self._execute_with_checkpoints(query)
except RecoverableError as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
await self._rollback_to_last_checkpoint()
except FatalError as e:
logger.error(f"Fatal error: {e}")
return f"Unable to complete request: {e}"
return "Max retries exceeded. Please try again or simplify your request."
async def _execute_with_checkpoints(self, query: str) -> str:
messages = [{"role": "user", "content": query}]
while True:
# Create checkpoint before each step
self.checkpoints.append({
"messages": messages.copy(),
"timestamp": datetime.now()
})
response = await self.llm.generate(messages)
if response.is_final:
return response.content
# Execute tool with error handling
try:
result = await self._safe_execute_tool(response.tool_call)
messages.append({"role": "tool", "content": result})
except ToolError as e:
# Let model know about the error
messages.append({"role": "tool", "content": f"Error: {e}"})
async def _rollback_to_last_checkpoint(self):
if self.checkpoints:
last = self.checkpoints.pop()
logger.info(f"Rolling back to checkpoint from {last['timestamp']}")Versioned Agent Configuration
Agent with versioned configuration and rollback support.
class VersionedAgent:
"""Agent with versioned configuration."""
def __init__(self, config_store, version: str = "latest"):
self.config_store = config_store
self.version = version
self.config = None
async def initialize(self):
"""Load versioned configuration."""
if self.version == "latest":
self.config = await self.config_store.get_latest()
else:
self.config = await self.config_store.get_version(self.version)
self.llm = create_llm_client(self.config["model"])
self.tools = load_tools(self.config["tools"])
self.prompts = self.config["prompts"]
async def run(self, query: str) -> dict:
"""Execute with version tracking."""
result = await self._execute(query)
return {
**result,
"agent_version": self.version,
"config_hash": self.config["hash"]
}
class AgentConfigStore:
"""Store and retrieve agent configurations."""
def __init__(self, db):
self.db = db
async def save_version(self, config: dict) -> str:
"""Save a new configuration version."""
version = datetime.now().strftime("%Y%m%d_%H%M%S")
config["version"] = version
config["hash"] = hashlib.md5(json.dumps(config, sort_keys=True).encode()).hexdigest()
await self.db.insert("agent_configs", config)
return version
async def get_version(self, version: str) -> dict:
"""Retrieve specific version."""
return await self.db.find_one("agent_configs", {"version": version})
async def get_latest(self) -> dict:
"""Get the most recent version."""
return await self.db.find_one(
"agent_configs",
sort=[("version", -1)]
)
async def rollback(self, to_version: str):
"""Rollback to a previous version."""
config = await self.get_version(to_version)
if config:
config["rolled_back_at"] = datetime.now().isoformat()
await self.save_version(config)A/B Testing Router
Route requests to different agent versions for testing.
class ABTestingAgentRouter:
"""Route requests to different agent versions for A/B testing."""
def __init__(self, agents: dict[str, 'Agent'], traffic_split: dict[str, float]):
"""
Args:
agents: {"control": agent_v1, "treatment": agent_v2}
traffic_split: {"control": 0.9, "treatment": 0.1}
"""
self.agents = agents
self.traffic_split = traffic_split
self.results = defaultdict(list)
async def route(self, query: str, user_id: str) -> tuple[str, dict]:
"""Route query to an agent variant and return result."""
# Deterministic assignment based on user_id
variant = self._assign_variant(user_id)
agent = self.agents[variant]
result = await agent.run(query)
# Log for analysis
self.results[variant].append({
"query": query,
"result": result,
"user_id": user_id,
"timestamp": datetime.now().isoformat()
})
return variant, result
def _assign_variant(self, user_id: str) -> str:
"""Deterministically assign user to variant."""
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
normalized = (hash_val % 10000) / 10000
cumulative = 0
for variant, split in self.traffic_split.items():
cumulative += split
if normalized < cumulative:
return variant
return list(self.agents.keys())[0]
def get_metrics(self) -> dict:
"""Compute metrics for each variant."""
metrics = {}
for variant, results in self.results.items():
metrics[variant] = {
"count": len(results),
"success_rate": sum(1 for r in results if r["result"].get("success", True)) / len(results) if results else 0,
"avg_latency": sum(r["result"].get("latency", 0) for r in results) / len(results) if results else 0
}
return metricsResearch Agent Examples
Core Research Agent
ReAct-style research agent with search and note-taking capabilities.
class ResearchAgent:
"""Agent for research tasks with search and analysis capabilities."""
SYSTEM_PROMPT = """You are a research assistant that thoroughly investigates topics.
For each step, use this format:
Thought: [your reasoning about what to do next]
Action: [tool_name](arguments)
After receiving observations, continue reasoning or provide final answer:
Answer: [your comprehensive response with citations]
Available tools:
- search(query): Search the web for information
- get_document(url): Retrieve and read a specific document
- take_notes(note): Save important information for later synthesis
Research guidelines:
- Search multiple queries to get diverse perspectives
- Verify important claims with multiple sources
- Take notes on key findings as you go
- Cite sources using [Source: url] format
- Be thorough but respect the depth parameter
Current depth: {depth} (quick: 1-2 searches, standard: 3-5, deep: 5-10)"""
def __init__(self, llm_client, search_tool, document_tool):
self.llm = llm_client
self.search = search_tool
self.document = document_tool
self.notes = []
self.sources = []
def take_notes(self, note: str) -> str:
"""Save a note for later synthesis."""
self.notes.append({
"content": note,
"timestamp": datetime.now().isoformat()
})
return f"Note saved. Total notes: {len(self.notes)}"
async def research(self, topic: str, depth: str = "standard") -> dict:
"""
Research a topic and produce a summary.
Args:
topic: Research topic
depth: "quick" (1-2 searches), "standard" (3-5), "deep" (5-10)
Returns:
dict with summary, sources, and raw notes
"""
self.notes = []
self.sources = []
max_searches = {"quick": 2, "standard": 5, "deep": 10}.get(depth, 5)
messages = [
{"role": "system", "content": self.SYSTEM_PROMPT.format(depth=depth)},
{"role": "user", "content": f"Research this topic thoroughly: {topic}"}
]
search_count = 0
for turn in range(max_searches * 3): # Allow 3 turns per search
response = await self.llm.generate(messages)
messages.append({"role": "assistant", "content": response})
# Check for final answer
if "Answer:" in response:
answer = response.split("Answer:")[-1].strip()
return {
"summary": answer,
"sources": self.sources,
"notes": self.notes,
"search_count": search_count
}
# Execute action
action = self._parse_action(response)
if action:
tool_name, args = action
result = await self._execute_tool(tool_name, args)
if tool_name == "search":
search_count += 1
if search_count >= max_searches:
result += "\n\n[Note: Search limit reached. Please synthesize your findings.]"
messages.append({"role": "user", "content": f"Observation: {result}"})
# Force synthesis if no answer yet
messages.append({
"role": "user",
"content": "Please provide your final Answer now based on what you've learned."
})
final_response = await self.llm.generate(messages)
return {
"summary": final_response.split("Answer:")[-1].strip() if "Answer:" in final_response else final_response,
"sources": self.sources,
"notes": self.notes,
"search_count": search_count
}
async def _execute_tool(self, name: str, args: list) -> str:
if name == "search":
result = await self.search(args[0])
# Track sources
for r in result.get("results", []):
self.sources.append({"url": r["url"], "title": r["title"]})
return json.dumps(result)
elif name == "get_document":
try:
doc = await self.document(args[0])
self.sources.append({"url": args[0], "title": doc.get("title", "")})
return doc.get("content", "")[:5000] # Truncate long docs
except Exception as e:
return f"Error retrieving document: {str(e)}"
elif name == "take_notes":
return self.take_notes(args[0])
else:
return f"Unknown tool: {name}"
def _parse_action(self, text: str) -> tuple | None:
import re
match = re.search(r'Action:\s*(\w+)\((.*?)\)', text, re.DOTALL)
if match:
tool_name = match.group(1)
args_str = match.group(2).strip()
# Handle quoted strings
if args_str.startswith('"') and args_str.endswith('"'):
args = [args_str[1:-1]]
elif args_str.startswith("'") and args_str.endswith("'"):
args = [args_str[1:-1]]
else:
args = [args_str]
return tool_name, args
return NoneFact-Checking Research Agent
Extended research agent with claim verification.
class FactCheckingResearchAgent(ResearchAgent):
"""Research agent with fact-checking capabilities."""
async def research_with_verification(self, topic: str, depth: str = "standard") -> dict:
# First, do initial research
initial = await self.research(topic, depth)
# Extract claims from summary
claims = await self._extract_claims(initial["summary"])
# Verify each claim
verified_claims = []
for claim in claims:
verification = await self._verify_claim(claim)
verified_claims.append({
"claim": claim,
"verified": verification["status"],
"evidence": verification["evidence"],
"confidence": verification["confidence"]
})
# Update summary with verification info
verified_summary = await self._integrate_verification(
initial["summary"],
verified_claims
)
return {
**initial,
"summary": verified_summary,
"claim_verification": verified_claims
}
async def _extract_claims(self, summary: str) -> list[str]:
"""Extract factual claims from summary."""
prompt = f"""Extract specific factual claims from this text.
Only include claims that can be objectively verified.
Output as JSON list of strings.
Text:
{summary}
Claims:"""
response = await self.llm.generate(prompt)
return json.loads(response)
async def _verify_claim(self, claim: str) -> dict:
"""Verify a single claim."""
# Search for supporting/refuting evidence
search_result = await self.search(f"fact check: {claim}")
# Evaluate evidence
prompt = f"""Evaluate whether this claim is supported by the evidence.
Claim: {claim}
Evidence from search:
{json.dumps(search_result, indent=2)}
Output JSON with:
- status: "verified", "disputed", "unverifiable"
- confidence: 0.0 to 1.0
- evidence: brief explanation of supporting/refuting evidence"""
response = await self.llm.generate(prompt)
return json.loads(response)Comparative Research Agent
Agent for researching and comparing multiple topics.
class ComparativeResearchAgent(ResearchAgent):
"""Research agent for comparing multiple topics."""
async def compare(self, topics: list[str], aspects: list[str] = None) -> dict:
"""
Research multiple topics and compare them.
Args:
topics: List of topics to compare
aspects: Optional list of aspects to compare on
Returns:
Comparison report with per-topic summaries and analysis
"""
# Research each topic
topic_results = {}
for topic in topics:
result = await self.research(topic, depth="standard")
topic_results[topic] = result
# Determine aspects to compare if not provided
if not aspects:
aspects = await self._identify_aspects(topics, topic_results)
# Build comparison matrix
comparison = await self._build_comparison(topics, topic_results, aspects)
# Generate analysis
analysis = await self._analyze_comparison(topics, comparison)
return {
"topics": topic_results,
"aspects": aspects,
"comparison": comparison,
"analysis": analysis
}
async def _build_comparison(
self,
topics: list[str],
results: dict,
aspects: list[str]
) -> dict:
"""Build comparison matrix."""
matrix = {aspect: {} for aspect in aspects}
for topic, result in results.items():
prompt = f"""Extract information about these aspects from the research summary.
Summary:
{result['summary']}
Aspects to extract: {aspects}
Output JSON dict mapping aspect to finding (or "not mentioned")."""
response = await self.llm.generate(prompt)
findings = json.loads(response)
for aspect, finding in findings.items():
if aspect in matrix:
matrix[aspect][topic] = finding
return matrix
async def _analyze_comparison(self, topics: list[str], comparison: dict) -> str:
"""Generate comparative analysis."""
prompt = f"""Analyze this comparison of {', '.join(topics)}.
Comparison data:
{json.dumps(comparison, indent=2)}
Provide:
1. Key similarities between the topics
2. Key differences
3. Notable findings or surprising results
4. Recommendations based on the comparison"""
return await self.llm.generate(prompt)Memory-Enabled Research Agent
Research agent with caching and learning from past research.
class MemoryEnabledResearchAgent(ResearchAgent):
"""Research agent with persistent memory."""
def __init__(self, llm_client, search_tool, document_tool, memory_store):
super().__init__(llm_client, search_tool, document_tool)
self.memory = memory_store
async def research(self, topic: str, depth: str = "standard") -> dict:
# Check for relevant past research
past_research = await self._recall_past_research(topic)
if past_research:
# Decide whether to use cached research or do fresh
should_refresh = await self._should_refresh(topic, past_research)
if not should_refresh:
return {
**past_research["result"],
"from_memory": True,
"memory_date": past_research["timestamp"]
}
# Do fresh research
result = await super().research(topic, depth)
# Store in memory
await self._store_research(topic, result)
return {**result, "from_memory": False}
async def _recall_past_research(self, topic: str) -> dict | None:
"""Search memory for past research on this topic."""
results = await self.memory.search(
query=topic,
k=3,
filter={"type": "research"}
)
if not results:
return None
# Check if any are highly relevant
for r in results:
if r["score"] > 0.85: # High similarity threshold
return r
return None
async def _should_refresh(self, topic: str, past_research: dict) -> bool:
"""Decide if we should do fresh research."""
# Refresh if old
past_date = datetime.fromisoformat(past_research["timestamp"])
age_days = (datetime.now() - past_date).days
if age_days > 7: # Older than a week
return True
# Refresh if topic seems time-sensitive
time_sensitive_keywords = ["latest", "recent", "current", "new", "2024", "2025", "2026"]
if any(kw in topic.lower() for kw in time_sensitive_keywords):
return True
return False
async def _store_research(self, topic: str, result: dict):
"""Store research result in memory."""
await self.memory.add({
"type": "research",
"topic": topic,
"result": result,
"timestamp": datetime.now().isoformat(),
"embedding": await self._embed(f"{topic}\n{result['summary'][:1000]}")
})Monitoring and Operations
Agent Metrics Collector
Track agent execution metrics for monitoring.
class AgentMetrics:
"""Collect and report agent metrics."""
def __init__(self, metrics_client):
self.metrics = metrics_client
def record_execution(
self,
agent_id: str,
query: str,
result: str,
duration_ms: float,
turns: int,
tool_calls: int,
success: bool
):
# Record metrics
self.metrics.histogram("agent.duration_ms", duration_ms, tags={"agent": agent_id})
self.metrics.counter("agent.executions", 1, tags={"agent": agent_id, "success": str(success)})
self.metrics.histogram("agent.turns", turns, tags={"agent": agent_id})
self.metrics.histogram("agent.tool_calls", tool_calls, tags={"agent": agent_id})
# Alert on anomalies
if turns > 15:
self.metrics.event("agent.high_turns", f"Agent {agent_id} used {turns} turns")
if duration_ms > 60000:
self.metrics.event("agent.slow_execution", f"Agent {agent_id} took {duration_ms}ms")
if not success:
self.metrics.event("agent.failure", f"Agent {agent_id} failed on: {query[:100]}")Agent Test Harness
Testing framework for agent evaluation.
class AgentTestHarness:
"""Test harness for agent evaluation."""
def __init__(self, agent, test_cases: list[dict]):
self.agent = agent
self.test_cases = test_cases
async def run_tests(self) -> dict:
results = {
"passed": 0,
"failed": 0,
"errors": [],
"details": []
}
for case in self.test_cases:
try:
response = await self.agent.run(case["query"])
passed = self._evaluate(response, case["expected"])
results["details"].append({
"query": case["query"],
"response": response,
"expected": case["expected"],
"passed": passed
})
if passed:
results["passed"] += 1
else:
results["failed"] += 1
except Exception as e:
results["failed"] += 1
results["errors"].append({
"query": case["query"],
"error": str(e)
})
return resultsStateful Agent
Agent with persistent state across sessions.
class StatefulAgent:
"""Agent with persistent state across sessions."""
def __init__(self, agent_id: str, state_store):
self.agent_id = agent_id
self.state_store = state_store
async def load_state(self) -> dict:
"""Load agent state from persistent store."""
state = await self.state_store.get(f"agent:{self.agent_id}")
return state or {
"conversation_history": [],
"tool_results_cache": {},
"learned_preferences": {},
"session_count": 0
}
async def save_state(self, state: dict):
"""Save agent state to persistent store."""
await self.state_store.set(
f"agent:{self.agent_id}",
state,
expire=86400 * 30 # 30 day expiry
)
async def run(self, query: str) -> str:
state = await self.load_state()
state["session_count"] += 1
try:
result = await self._execute(query, state)
state["conversation_history"].append({
"query": query,
"response": result,
"timestamp": datetime.now().isoformat()
})
return result
finally:
await self.save_state(state)Audit Logging
Log all agent actions for compliance and debugging.
def log_agent_action(
agent_id: str,
action: str,
args: dict,
result: str,
user_id: str
):
"""Log all agent actions for audit trail."""
entry = {
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"user_id": user_id,
"action": action,
"arguments": args,
"result_summary": result[:500], # Truncate large results
"success": "error" not in result.lower()
}
audit_logger.info(json.dumps(entry))
# Also write to durable audit store
audit_store.append(entry)