Agentic Systems - Complete Code Reference

This reference contains full implementations of agentic system patterns from Chapter 8.


Tool and Function Definitions

Agentic Tool Execution Loop

Complete implementation of the tool execution loop that handles multiple turns of tool calls.

def agentic_loop(user_query: str, tools: list, max_turns: int = 10) -> str:
    """Execute tool calls until the model produces a final response."""
    messages = [{"role": "user", "content": user_query}]

    for turn in range(max_turns):
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            messages=messages,
            tools=tools
        )

        # Add assistant response to history
        messages.append({"role": "assistant", "content": response.content})

        # Check if we're done
        if response.stop_reason == "end_turn":
            # Extract text response
            for block in response.content:
                if hasattr(block, "text"):
                    return block.text
            return ""

        # Execute tool calls
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                try:
                    result = execute_tool(block.name, block.input)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(result) if not isinstance(result, str) else result
                    })
                except Exception as e:
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": f"Error: {str(e)}",
                        "is_error": True
                    })

        messages.append({"role": "user", "content": tool_results})

    return "Max turns exceeded without final response"

Tool Description Best Practices

Example of a well-documented tool schema with clear descriptions and constraints.

# Bad: Vague description {.unnumbered}
{
    "name": "search",
    "description": "Search for stuff"
}

# Good: Clear, specific description {.unnumbered}
{
    "name": "search_knowledge_base",
    "description": """Search the company's internal knowledge base for documents.

Use this tool when the user asks questions about:
- Company policies and procedures
- Product documentation
- Internal processes

Returns top 5 matching documents with relevance scores.
Does NOT search: email, Slack, or external websites.""",
    "input_schema": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Natural language search query. Be specific—include product names, policy types, or document titles if known."
            },
            "department": {
                "type": "string",
                "enum": ["engineering", "sales", "hr", "finance", "all"],
                "description": "Limit search to a specific department, or 'all' for company-wide search"
            }
        },
        "required": ["query"]
    }
}

MCP Server Patterns

Complete MCP Server Implementation

Full MCP server with document search capabilities, resources, and proper async handling.

"""MCP server exposing document search capabilities."""
import asyncio
import json
from mcp import Server, Tool
from mcp.types import TextContent

# Initialize server {.unnumbered}
server = Server("document-search")

# Define a tool {.unnumbered}
@server.tool("search_documents")
async def search_documents(query: str, limit: int = 5) -> list[dict]:
    """
    Search internal documents.

    Args:
        query: Natural language search query
        limit: Maximum results to return (default 5, max 20)

    Returns:
        List of matching documents with snippets and scores
    """
    # Connect to your actual search backend
    results = await vector_store.search(query, top_k=min(limit, 20))

    return [
        {
            "id": r.id,
            "title": r.metadata.get("title", "Untitled"),
            "snippet": r.text[:500],
            "score": r.score
        }
        for r in results
    ]

@server.tool("get_document")
async def get_document(doc_id: str) -> dict:
    """
    Retrieve full document by ID.

    Args:
        doc_id: Document ID from search results

    Returns:
        Full document content and metadata
    """
    doc = await document_store.get(doc_id)
    if not doc:
        return {"error": f"Document {doc_id} not found"}

    return {
        "id": doc_id,
        "title": doc.title,
        "content": doc.content,
        "updated_at": doc.updated_at.isoformat()
    }

# Define a resource {.unnumbered}
@server.resource("corpus://stats")
async def corpus_stats():
    """Corpus statistics, refreshed hourly."""
    stats = await get_corpus_stats()
    return {
        "total_documents": stats.doc_count,
        "last_indexed": stats.last_index_time.isoformat()
    }

# Run server {.unnumbered}
async def main():
    async with server.run_stdio() as running:
        await running.wait_closed()

if __name__ == "__main__":
    asyncio.run(main())

MCP SQL Security Pattern

Secure SQL query tool with read-only validation and timeout protection.

@server.tool("execute_sql")
async def execute_sql(query: str) -> dict:
    """
    Execute a read-only SQL query.

    Security:
    - Only SELECT statements allowed
    - Query timeout: 30 seconds
    - Results limited to 1000 rows
    """
    # Validate query is read-only
    if not query.strip().upper().startswith("SELECT"):
        return {"error": "Only SELECT queries are allowed"}

    # Check for dangerous patterns
    dangerous_patterns = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER"]
    for pattern in dangerous_patterns:
        if pattern in query.upper():
            return {"error": f"Query contains forbidden keyword: {pattern}"}

    # Execute with timeout
    try:
        async with asyncio.timeout(30):
            results = await db.execute(query)
            return {"rows": results[:1000], "truncated": len(results) > 1000}
    except asyncio.TimeoutError:
        return {"error": "Query timed out"}

Agent Architectures

ReAct Agent

Complete ReAct (Reasoning + Acting) agent implementation with thought/action parsing.

class ReActAgent:
    """Agent using ReAct pattern: Reasoning + Acting."""

    SYSTEM_PROMPT = """You are a helpful assistant that solves problems step by step.

For each step, you must use this exact format:
Thought: [your reasoning about what to do next]
Action: [tool_name]([arguments])

After receiving an observation, continue with another Thought/Action or provide the final answer:
Answer: [your final response to the user]

Available tools:
{tool_descriptions}

Always think before acting. Never skip the Thought step."""

    def __init__(self, llm_client, tools: dict[str, callable]):
        self.llm = llm_client
        self.tools = tools

    def run(self, query: str, max_steps: int = 10) -> str:
        tool_desc = self._format_tool_descriptions()
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT.format(tool_descriptions=tool_desc)},
            {"role": "user", "content": query}
        ]

        for step in range(max_steps):
            response = self.llm.generate(messages)
            messages.append({"role": "assistant", "content": response})

            # Check for final answer
            if "Answer:" in response:
                return response.split("Answer:")[-1].strip()

            # Extract and execute action
            action = self._parse_action(response)
            if action:
                tool_name, args = action
                if tool_name in self.tools:
                    try:
                        result = self.tools[tool_name](*args)
                        observation = f"Observation: {result}"
                    except Exception as e:
                        observation = f"Observation: Error - {str(e)}"
                else:
                    observation = f"Observation: Unknown tool '{tool_name}'"

                messages.append({"role": "user", "content": observation})
            else:
                messages.append({"role": "user", "content": "Observation: Could not parse action. Please use the format: Action: tool_name(args)"})

        return "Max steps reached without finding an answer."

    def _parse_action(self, text: str) -> tuple[str, list] | None:
        """Extract action from response."""
        import re
        match = re.search(r'Action:\s*(\w+)\((.*?)\)', text)
        if match:
            tool_name = match.group(1)
            args_str = match.group(2)
            # Simple argument parsing (production code needs proper handling)
            args = [a.strip().strip('"\'') for a in args_str.split(',')] if args_str else []
            return tool_name, args
        return None

Planning Agent

Agent that creates explicit plans before execution with parallel step support.

class PlanningAgent:
    """Agent that creates a plan before executing."""

    PLANNING_PROMPT = """Create a step-by-step plan to accomplish this task.

Task: {task}

Available tools:
{tools}

Output your plan as a numbered list. Each step should be concrete and actionable.
Mark steps that can be done in parallel with [PARALLEL].

Plan:"""

    EXECUTION_PROMPT = """Execute this step of the plan.

Overall task: {task}
Current step: {step}
Previous results: {context}

Use the appropriate tool and return the result."""

    def __init__(self, llm_client, tools: dict):
        self.llm = llm_client
        self.tools = tools

    async def run(self, task: str) -> str:
        # Phase 1: Create plan
        plan = await self._create_plan(task)
        print(f"Plan:\n{plan}")

        # Phase 2: Execute plan
        results = {}
        for i, step in enumerate(plan):
            if step.get("parallel_with"):
                # Execute parallel steps together
                parallel_steps = [plan[j] for j in step["parallel_with"]]
                parallel_results = await asyncio.gather(*[
                    self._execute_step(s, task, results)
                    for s in [step] + parallel_steps
                ])
                for s, r in zip([step] + parallel_steps, parallel_results):
                    results[s["id"]] = r
            else:
                results[step["id"]] = await self._execute_step(step, task, results)

        # Phase 3: Synthesize final answer
        return await self._synthesize(task, results)

    async def _create_plan(self, task: str) -> list[dict]:
        response = await self.llm.generate(
            self.PLANNING_PROMPT.format(task=task, tools=self._tool_descriptions())
        )
        return self._parse_plan(response)

    async def _execute_step(self, step: dict, task: str, context: dict) -> str:
        prompt = self.EXECUTION_PROMPT.format(
            task=task,
            step=step["description"],
            context=json.dumps(context, indent=2)
        )
        return await self.llm.generate(prompt, tools=self.tools)

Supervisor-Worker Multi-Agent

Supervisor that coordinates specialized worker agents with dependency management.

class SupervisorAgent:
    """Coordinates specialist worker agents."""

    def __init__(self, llm_client, workers: dict[str, 'WorkerAgent']):
        self.llm = llm_client
        self.workers = workers

    async def execute(self, request: str) -> str:
        # Create subtask breakdown
        subtasks = await self._decompose(request)

        # Execute subtasks respecting dependencies
        completed = {}
        while not all(t["status"] == "done" for t in subtasks):
            # Find ready tasks (dependencies satisfied)
            ready = [
                t for t in subtasks
                if t["status"] == "pending"
                and all(d in completed for d in t.get("depends_on", []))
            ]

            if not ready:
                # Check for stuck state
                pending = [t for t in subtasks if t["status"] == "pending"]
                if pending:
                    raise AgentError("Execution stuck: dependencies cannot be resolved")
                break

            # Execute ready tasks in parallel
            results = await asyncio.gather(*[
                self._execute_subtask(t) for t in ready
            ])

            for task, result in zip(ready, results):
                task["status"] = "done"
                task["result"] = result
                completed[task["id"]] = result

        # Synthesize final response
        return await self._synthesize(request, subtasks)

    async def _execute_subtask(self, task: dict) -> str:
        worker_type = task["worker"]
        if worker_type not in self.workers:
            return f"Error: Unknown worker type {worker_type}"

        worker = self.workers[worker_type]
        return await worker.execute(task["description"])

    async def _decompose(self, request: str) -> list[dict]:
        prompt = f"""Break down this request into subtasks for specialist workers.

Available workers:
- RESEARCHER: Searches information, reads documents
- CODER: Writes and analyzes code
- ANALYST: Performs data analysis
- WRITER: Drafts documents and communications

Request: {request}

Output JSON array of subtasks with: id, description, worker, depends_on (list of task ids)"""

        response = await self.llm.generate(prompt)
        return json.loads(response)


class WorkerAgent:
    """Specialist worker agent."""

    def __init__(self, llm_client, tools: list, system_prompt: str):
        self.llm = llm_client
        self.tools = tools
        self.system_prompt = system_prompt

    async def execute(self, task: str) -> str:
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": task}
        ]

        # Run agentic loop with tools
        return await agentic_loop(messages, self.tools)

Hierarchical Task Network Agent

Agent that recursively decomposes tasks into hierarchical subtasks.

class HierarchicalAgent:
    """Agent using hierarchical task networks."""

    def __init__(self, llm_client, tools: dict):
        self.llm = llm_client
        self.tools = tools

    async def run(self, task: str) -> str:
        # Build task hierarchy
        task_tree = await self._build_task_tree(task)

        # Execute from leaves up
        results = await self._execute_tree(task_tree)

        # Synthesize final result
        return await self._synthesize(task, results)

    async def _build_task_tree(self, task: str, depth: int = 0) -> dict:
        """Recursively decompose task into subtasks."""
        if depth > 3:  # Max depth
            return {"task": task, "subtasks": [], "is_leaf": True}

        prompt = f"""Is this task simple enough to execute directly with available tools?

Task: {task}

Available tools: {list(self.tools.keys())}

If YES (can execute directly), respond: LEAF
If NO (needs decomposition), provide 2-4 subtasks as JSON list."""

        response = await self.llm.generate(prompt)

        if "LEAF" in response.upper():
            return {"task": task, "subtasks": [], "is_leaf": True}

        # Parse subtasks and recurse
        subtask_list = json.loads(response)
        subtasks = []
        for subtask in subtask_list:
            subtree = await self._build_task_tree(subtask, depth + 1)
            subtasks.append(subtree)

        return {"task": task, "subtasks": subtasks, "is_leaf": False}

    async def _execute_tree(self, node: dict) -> dict:
        """Execute task tree from leaves up."""
        if node["is_leaf"]:
            # Execute leaf task
            result = await self._execute_leaf(node["task"])
            return {"task": node["task"], "result": result}

        # Execute subtasks first
        subtask_results = []
        for subtask in node["subtasks"]:
            result = await self._execute_tree(subtask)
            subtask_results.append(result)

        # Combine subtask results
        combined = await self._combine_results(node["task"], subtask_results)
        return {"task": node["task"], "result": combined, "subtasks": subtask_results}

Self-Reflecting Agent

Agent that evaluates and improves its own responses.

class SelfReflectingAgent:
    """Agent that reflects on and improves its responses."""

    REFLECTION_PROMPT = """Review your response to this query:

Query: {query}
Your response: {response}

Evaluate:
1. Does it fully answer the query?
2. Is the information accurate (based on your tools/sources)?
3. Is anything missing or unclear?
4. Could the response be improved?

If improvements are needed, provide an improved response.
If the response is good, respond with "APPROVED: [brief explanation]"."""

    def __init__(self, llm_client, tools: dict):
        self.llm = llm_client
        self.tools = tools

    async def run(self, query: str) -> str:
        # Generate initial response
        initial_response = await self._generate(query)

        # Reflect and potentially improve
        for iteration in range(3):  # Max 3 reflection iterations
            reflection = await self._reflect(query, initial_response)

            if reflection.startswith("APPROVED"):
                return initial_response

            # Use reflection as improved response
            initial_response = reflection

        return initial_response

    async def _reflect(self, query: str, response: str) -> str:
        return await self.llm.generate(
            self.REFLECTION_PROMPT.format(query=query, response=response)
        )

Critic-Actor Agent

Agent with separate generation and evaluation roles for quality control.

class CriticActorAgent:
    """Agent with separate actor and critic roles."""

    def __init__(self, actor_llm, critic_llm, tools: dict):
        self.actor = actor_llm
        self.critic = critic_llm
        self.tools = tools

    async def run(self, query: str) -> str:
        # Actor generates response
        response = await self._actor_generate(query)

        # Critic evaluates
        critique = await self._critic_evaluate(query, response)

        if critique["approved"]:
            return response

        # Actor revises based on critique
        for revision in range(3):
            response = await self._actor_revise(query, response, critique["feedback"])
            critique = await self._critic_evaluate(query, response)

            if critique["approved"]:
                return response

        return response  # Return best effort

    async def _critic_evaluate(self, query: str, response: str) -> dict:
        prompt = f"""Evaluate this response:

Query: {query}
Response: {response}

Rate from 1-5 on:
- Accuracy: Is the information correct?
- Completeness: Does it fully address the query?
- Clarity: Is it easy to understand?

If all ratings are 4+, respond: APPROVED
Otherwise, provide specific feedback for improvement.

Output JSON: {{"approved": bool, "ratings": {{}}, "feedback": str}}"""

        result = await self.critic.generate(prompt)
        return json.loads(result)

Memory-Augmented Agent

Agent with persistent memory and episodic consolidation.

class MemoryAugmentedAgent:
    """Agent with persistent memory across sessions."""

    def __init__(self, llm_client, tools: dict, memory_store):
        self.llm = llm_client
        self.tools = tools
        self.memory = memory_store  # Vector store for memories

    async def run(self, query: str, user_id: str) -> str:
        # Retrieve relevant memories
        memories = await self._recall(query, user_id)

        # Include memories in context
        context = self._format_memories(memories)

        # Generate response with memory context
        response = await self._generate_with_memory(query, context)

        # Store new memory
        await self._remember(query, response, user_id)

        return response

    async def _recall(self, query: str, user_id: str, k: int = 5) -> list[dict]:
        """Retrieve relevant memories."""
        # Search for similar past interactions
        results = await self.memory.search(
            query=query,
            filter={"user_id": user_id},
            k=k
        )
        return results

    async def _remember(self, query: str, response: str, user_id: str):
        """Store interaction as memory."""
        memory = {
            "query": query,
            "response": response,
            "user_id": user_id,
            "timestamp": datetime.now().isoformat(),
            "embedding": await self._embed(f"{query}\n{response}")
        }
        await self.memory.add(memory)

    def _format_memories(self, memories: list[dict]) -> str:
        if not memories:
            return ""

        formatted = ["Relevant past interactions:"]
        for m in memories:
            formatted.append(f"- Q: {m['query'][:100]}...")
            formatted.append(f"  A: {m['response'][:200]}...")
        return "\n".join(formatted)


class EpisodicMemory:
    """Episodic memory with forgetting and consolidation."""

    def __init__(self, vector_store, max_memories: int = 1000):
        self.store = vector_store
        self.max_memories = max_memories

    async def add(self, memory: dict):
        """Add memory with automatic consolidation."""
        await self.store.add(memory)

        # Check if consolidation needed
        count = await self.store.count()
        if count > self.max_memories:
            await self._consolidate()

    async def _consolidate(self):
        """Consolidate old memories to maintain limit."""
        # Get oldest memories
        old_memories = await self.store.get_oldest(n=100)

        # Summarize similar old memories
        clusters = self._cluster_memories(old_memories)
        for cluster in clusters:
            if len(cluster) > 3:
                summary = await self._summarize_cluster(cluster)
                # Replace cluster with summary
                await self.store.delete(ids=[m["id"] for m in cluster])
                await self.store.add(summary)

Computer Use Agent

Agent that interacts with graphical user interfaces via screenshots and actions.

from dataclasses import dataclass
from typing import Literal
import base64

@dataclass
class ScreenAction:
    type: Literal["click", "type", "scroll", "screenshot", "wait"]
    x: int = None
    y: int = None
    text: str = None
    direction: str = None

class ComputerUseAgent:
    """Agent that can interact with computer GUI."""

    def __init__(self, vlm_client, screen_controller):
        self.vlm = vlm_client
        self.screen = screen_controller

    async def execute_task(self, task: str, max_steps: int = 20) -> dict:
        """Execute a task using computer interface."""
        history = []

        for step in range(max_steps):
            # Take screenshot of current state
            screenshot = await self.screen.capture()
            screenshot_b64 = base64.b64encode(screenshot).decode()

            # Ask VLM what action to take
            action = await self._decide_action(task, screenshot_b64, history)

            if action['type'] == 'done':
                return {
                    'success': True,
                    'steps': len(history),
                    'history': history
                }

            # Execute the action
            result = await self._execute_action(action)

            history.append({
                'step': step,
                'action': action,
                'result': result
            })

            # Brief pause to let UI update
            await asyncio.sleep(0.5)

        return {
            'success': False,
            'reason': 'max_steps_exceeded',
            'steps': len(history),
            'history': history
        }

    async def _decide_action(
        self,
        task: str,
        screenshot_b64: str,
        history: list
    ) -> dict:
        """Use VLM to decide next action."""
        history_summary = self._summarize_history(history[-5:])

        response = await self.vlm.generate(
            messages=[{
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": f"""You are controlling a computer to complete this task: {task}

Previous actions: {history_summary}

Look at the current screen and decide the next action.

Available actions:
- click(x, y): Click at coordinates
- type(text): Type text
- scroll(direction): Scroll up/down
- done: Task is complete

Respond with JSON: {{"type": "click|type|scroll|done", "x": num, "y": num, "text": "...", "direction": "up|down"}}"""
                    },
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": "image/png",
                            "data": screenshot_b64
                        }
                    }
                ]
            }],
            max_tokens=200
        )

        return json.loads(response)

    async def _execute_action(self, action: dict) -> dict:
        """Execute an action on the screen."""
        if action['type'] == 'click':
            await self.screen.click(action['x'], action['y'])
            return {'executed': 'click', 'position': (action['x'], action['y'])}

        elif action['type'] == 'type':
            await self.screen.type_text(action['text'])
            return {'executed': 'type', 'text': action['text']}

        elif action['type'] == 'scroll':
            await self.screen.scroll(action['direction'])
            return {'executed': 'scroll', 'direction': action['direction']}

        return {'executed': action['type']}

Safety and Sandboxing

Scoped Tool Set

Tool access control based on user permissions.

class ScopedToolSet:
    """Tools with explicit capability scoping."""

    def __init__(self, user_permissions: set[str]):
        self.permissions = user_permissions

    def get_allowed_tools(self) -> list[dict]:
        tools = []

        # Read-only tools available to all
        tools.append({
            "name": "search",
            "description": "Search documents"
        })

        # Write tools require explicit permission
        if "documents:write" in self.permissions:
            tools.append({
                "name": "create_document",
                "description": "Create a new document"
            })

        # Dangerous tools require elevated permission
        if "admin:execute" in self.permissions:
            tools.append({
                "name": "execute_sql",
                "description": "Execute SQL query"
            })

        return tools

Safe Agent with Confirmation

Agent that requires human approval for sensitive actions.

class ConfirmationRequired(Exception):
    """Raised when an action needs human confirmation."""
    def __init__(self, action: str, details: dict):
        self.action = action
        self.details = details

class SafeAgent:
    """Agent that requests confirmation for sensitive actions."""

    SENSITIVE_ACTIONS = {
        "send_email": lambda args: len(args.get("recipients", [])) > 5,
        "delete_file": lambda args: True,  # Always confirm deletes
        "execute_code": lambda args: "import os" in args.get("code", ""),
        "transfer_money": lambda args: args.get("amount", 0) > 100,
    }

    async def execute_tool(self, name: str, args: dict) -> str:
        # Check if confirmation is needed
        if name in self.SENSITIVE_ACTIONS:
            needs_confirmation = self.SENSITIVE_ACTIONS[name](args)
            if needs_confirmation:
                raise ConfirmationRequired(
                    action=name,
                    details={
                        "tool": name,
                        "arguments": args,
                        "reason": f"Action '{name}' requires human approval"
                    }
                )

        # Execute if no confirmation needed
        return await self.tools[name](**args)

# Usage in application {.unnumbered}
async def run_agent_with_confirmation(agent, query):
    while True:
        try:
            return await agent.run(query)
        except ConfirmationRequired as e:
            # Show confirmation dialog to user
            approved = await show_confirmation_dialog(e.action, e.details)
            if approved:
                # Execute with explicit approval
                agent.approved_actions.add(e.action)
            else:
                return "Action cancelled by user."

Sandboxed Code Executor

Docker-based code execution sandbox with resource limits.

import docker
import tempfile

class SandboxedCodeExecutor:
    """Execute code in isolated Docker container."""

    def __init__(
        self,
        timeout: int = 30,
        memory_limit: str = "256m",
        network_disabled: bool = True
    ):
        self.client = docker.from_env()
        self.timeout = timeout
        self.memory_limit = memory_limit
        self.network_disabled = network_disabled

    def execute(self, code: str, language: str = "python") -> dict:
        """Execute code in sandbox and return result."""
        # Write code to temp file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            f.write(code)
            code_path = f.name

        try:
            # Run in container
            container = self.client.containers.run(
                image="python:3.11-slim",
                command=f"python /code/script.py",
                volumes={code_path: {"bind": "/code/script.py", "mode": "ro"}},
                mem_limit=self.memory_limit,
                network_disabled=self.network_disabled,
                remove=True,
                detach=True
            )

            # Wait with timeout
            try:
                result = container.wait(timeout=self.timeout)
                logs = container.logs().decode()
                return {
                    "success": result["StatusCode"] == 0,
                    "output": logs,
                    "exit_code": result["StatusCode"]
                }
            except Exception:
                container.kill()
                return {
                    "success": False,
                    "output": "Execution timed out",
                    "exit_code": -1
                }
        finally:
            os.unlink(code_path)

Rate-Limited Agent

Agent with execution limits to prevent runaway behavior.

class RateLimitedAgent:
    """Agent with rate limiting and cost controls."""

    def __init__(
        self,
        llm_client,
        tools: dict,
        max_turns: int = 20,
        max_tool_calls: int = 50,
        max_cost: float = 1.0
    ):
        self.llm = llm_client
        self.tools = tools
        self.max_turns = max_turns
        self.max_tool_calls = max_tool_calls
        self.max_cost = max_cost

        self.turn_count = 0
        self.tool_call_count = 0
        self.total_cost = 0.0

    async def run(self, query: str) -> str:
        while self.turn_count < self.max_turns:
            # Check limits
            if self.tool_call_count >= self.max_tool_calls:
                return "Tool call limit reached. Please simplify your request."

            if self.total_cost >= self.max_cost:
                return "Cost limit reached. Please try a more focused request."

            # Execute turn
            response = await self._execute_turn(query)

            # Track metrics
            self.turn_count += 1
            self.total_cost += response.usage.cost

            if response.is_final:
                return response.content

        return "Turn limit reached without completing task."

Loop Detector

Detect and recover from repetitive agent patterns.

class LoopDetector:
    """Detect when an agent is stuck in a loop."""

    def __init__(
        self,
        similarity_threshold: float = 0.85,
        window_size: int = 5,
        max_similar: int = 3
    ):
        self.similarity_threshold = similarity_threshold
        self.window_size = window_size
        self.max_similar = max_similar
        self.state_history = []

    def check(self, state: str) -> bool:
        """
        Check if we're in a loop.
        Returns True if loop detected.
        """
        if len(self.state_history) < 2:
            self.state_history.append(state)
            return False

        # Check recent states for similarity
        recent = self.state_history[-self.window_size:]
        similar_count = sum(
            1 for past_state in recent
            if self._similarity(state, past_state) > self.similarity_threshold
        )

        self.state_history.append(state)

        if similar_count >= self.max_similar:
            return True

        return False

    def _similarity(self, s1: str, s2: str) -> float:
        """Compute similarity between two states."""
        # Simple: exact match or Jaccard similarity
        words1 = set(s1.lower().split())
        words2 = set(s2.lower().split())

        if not words1 or not words2:
            return 0.0

        intersection = len(words1 & words2)
        union = len(words1 | words2)

        return intersection / union

    def get_loop_summary(self) -> str:
        """Summarize the detected loop for debugging."""
        if len(self.state_history) < 2:
            return "No loop detected"

        recent = self.state_history[-5:]
        return f"Recent states (last 5):\n" + "\n---\n".join(
            s[:200] + "..." if len(s) > 200 else s
            for s in recent
        )


class AgentWithLoopDetection:
    """Agent that detects and recovers from loops."""

    def __init__(self, llm_client, tools: dict):
        self.llm = llm_client
        self.tools = tools
        self.loop_detector = LoopDetector()

    async def run(self, query: str, max_turns: int = 15) -> str:
        messages = [{"role": "user", "content": query}]

        for turn in range(max_turns):
            response = await self.llm.generate(messages)

            # Check for loop
            state = self._extract_state(response)
            if self.loop_detector.check(state):
                # Try to break the loop
                recovery = await self._attempt_recovery(messages, response)
                if recovery:
                    return recovery
                else:
                    return f"Unable to complete: detected repetitive pattern. Last response: {response.content[:500]}"

            messages.append({"role": "assistant", "content": response.content})

            if response.is_final:
                return response.content

            # Execute tools and continue
            tool_results = await self._execute_tools(response)
            messages.extend(tool_results)

        return "Max turns exceeded"

    async def _attempt_recovery(self, messages: list, last_response) -> str | None:
        """Try to recover from a loop by prompting differently."""
        recovery_prompt = """You seem to be repeating similar actions without making progress.

Please:
1. Summarize what you've learned so far
2. Identify why you might be stuck
3. Either provide a final answer with what you know, or explain what's missing

If you have enough information to partially answer, please do so."""

        messages.append({"role": "user", "content": recovery_prompt})
        response = await self.llm.generate(messages)

        if "final answer" in response.content.lower() or response.is_final:
            return response.content

        return None

Output Validator

Validate agent outputs for quality and safety.

class OutputValidator:
    """Validate agent outputs meet quality standards."""

    def __init__(self, validators: list[callable]):
        self.validators = validators

    def validate(self, query: str, response: str) -> tuple[bool, list[str]]:
        """
        Validate response quality.
        Returns (is_valid, list of issues).
        """
        issues = []

        for validator in self.validators:
            try:
                result = validator(query, response)
                if not result["valid"]:
                    issues.append(result["issue"])
            except Exception as e:
                issues.append(f"Validation error: {str(e)}")

        return len(issues) == 0, issues


# Example validators {.unnumbered}
def check_not_empty(query: str, response: str) -> dict:
    """Ensure response is not empty or trivial."""
    if not response or len(response.strip()) < 10:
        return {"valid": False, "issue": "Response is empty or too short"}
    return {"valid": True}

def check_no_refusal(query: str, response: str) -> dict:
    """Ensure agent didn't refuse without good reason."""
    refusal_patterns = [
        "i cannot", "i'm unable to", "i don't have access",
        "i can't help with"
    ]
    lower_response = response.lower()
    for pattern in refusal_patterns:
        if pattern in lower_response:
            return {
                "valid": False,
                "issue": f"Response appears to be a refusal: found '{pattern}'"
            }
    return {"valid": True}

def check_coherence(query: str, response: str) -> dict:
    """Basic coherence check - response should relate to query."""
    # Simple heuristic: check for keyword overlap
    query_words = set(query.lower().split())
    response_words = set(response.lower().split())

    # Remove common words
    stopwords = {"the", "a", "an", "is", "are", "was", "were", "be", "been",
                 "being", "have", "has", "had", "do", "does", "did", "will",
                 "would", "could", "should", "may", "might", "must", "shall"}
    query_words -= stopwords
    response_words -= stopwords

    overlap = len(query_words & response_words)
    if overlap == 0 and len(query_words) > 2:
        return {
            "valid": False,
            "issue": "Response doesn't seem related to query"
        }
    return {"valid": True}


class ValidatedAgent:
    """Agent with output validation."""

    def __init__(self, base_agent, validators: list[callable]):
        self.agent = base_agent
        self.validator = OutputValidator(validators)

    async def run(self, query: str, max_retries: int = 2) -> str:
        for attempt in range(max_retries + 1):
            response = await self.agent.run(query)

            is_valid, issues = self.validator.validate(query, response)

            if is_valid:
                return response

            if attempt < max_retries:
                # Try to fix issues
                fix_prompt = f"""Your previous response had these issues:
{chr(10).join('- ' + issue for issue in issues)}

Original query: {query}

Please try again, addressing these issues."""

                query = fix_prompt

        # Return last response even if invalid, with warning
        return f"[Warning: Response may have quality issues: {', '.join(issues)}]\n\n{response}"

Production Patterns

Agent Evaluation Framework

Comprehensive evaluation for agent benchmarks.

class AgentEvaluator:
    """Evaluate agent performance on task benchmarks."""

    def __init__(self, agent, benchmark_tasks: list[dict]):
        self.agent = agent
        self.tasks = benchmark_tasks

    async def run_evaluation(self) -> dict:
        """Run agent on all benchmark tasks."""
        results = []

        for task in self.tasks:
            result = await self._evaluate_task(task)
            results.append(result)

        return self._aggregate_results(results)

    async def _evaluate_task(self, task: dict) -> dict:
        """Evaluate agent on a single task."""
        start_time = time.time()
        total_tokens = 0

        try:
            # Run agent
            agent_result = await self.agent.execute(
                task['instruction'],
                environment=task.get('environment')
            )

            elapsed = time.time() - start_time

            # Check success using task-specific verifier
            success = await self._verify_completion(
                task,
                agent_result
            )

            return {
                'task_id': task['id'],
                'success': success,
                'steps': agent_result.get('steps', 0),
                'elapsed_seconds': elapsed,
                'tokens_used': agent_result.get('tokens', 0),
                'error': None
            }

        except Exception as e:
            return {
                'task_id': task['id'],
                'success': False,
                'error': str(e),
                'elapsed_seconds': time.time() - start_time
            }

    async def _verify_completion(
        self,
        task: dict,
        agent_result: dict
    ) -> bool:
        """Verify task completion using task-specific criteria."""
        verifier_type = task.get('verifier', 'exact_match')

        if verifier_type == 'exact_match':
            return agent_result.get('output') == task['expected_output']

        elif verifier_type == 'contains':
            return task['expected_substring'] in agent_result.get('output', '')

        elif verifier_type == 'code_execution':
            # Run test cases
            return await self._run_tests(
                agent_result.get('code'),
                task['test_cases']
            )

        elif verifier_type == 'llm_judge':
            # Use LLM to judge completion
            return await self._llm_judge(
                task['instruction'],
                agent_result,
                task['rubric']
            )

        return False

    def _aggregate_results(self, results: list[dict]) -> dict:
        """Aggregate individual results into summary metrics."""
        successful = [r for r in results if r['success']]

        return {
            'total_tasks': len(results),
            'successful': len(successful),
            'success_rate': len(successful) / len(results) if results else 0,
            'avg_steps': sum(r.get('steps', 0) for r in successful) / len(successful) if successful else 0,
            'avg_time': sum(r['elapsed_seconds'] for r in successful) / len(successful) if successful else 0,
            'errors': [r for r in results if r.get('error')],
            'detailed_results': results
        }

Resilient Agent with Checkpoints

Agent with failure recovery and state checkpointing.

class ResilientAgent:
    """Agent with failure recovery."""

    def __init__(self, llm_client, tools: dict, max_retries: int = 3):
        self.llm = llm_client
        self.tools = tools
        self.max_retries = max_retries
        self.checkpoints = []

    async def run(self, query: str) -> str:
        for attempt in range(self.max_retries):
            try:
                return await self._execute_with_checkpoints(query)
            except RecoverableError as e:
                logger.warning(f"Attempt {attempt + 1} failed: {e}")
                await self._rollback_to_last_checkpoint()
            except FatalError as e:
                logger.error(f"Fatal error: {e}")
                return f"Unable to complete request: {e}"

        return "Max retries exceeded. Please try again or simplify your request."

    async def _execute_with_checkpoints(self, query: str) -> str:
        messages = [{"role": "user", "content": query}]

        while True:
            # Create checkpoint before each step
            self.checkpoints.append({
                "messages": messages.copy(),
                "timestamp": datetime.now()
            })

            response = await self.llm.generate(messages)

            if response.is_final:
                return response.content

            # Execute tool with error handling
            try:
                result = await self._safe_execute_tool(response.tool_call)
                messages.append({"role": "tool", "content": result})
            except ToolError as e:
                # Let model know about the error
                messages.append({"role": "tool", "content": f"Error: {e}"})

    async def _rollback_to_last_checkpoint(self):
        if self.checkpoints:
            last = self.checkpoints.pop()
            logger.info(f"Rolling back to checkpoint from {last['timestamp']}")

Versioned Agent Configuration

Agent with versioned configuration and rollback support.

class VersionedAgent:
    """Agent with versioned configuration."""

    def __init__(self, config_store, version: str = "latest"):
        self.config_store = config_store
        self.version = version
        self.config = None

    async def initialize(self):
        """Load versioned configuration."""
        if self.version == "latest":
            self.config = await self.config_store.get_latest()
        else:
            self.config = await self.config_store.get_version(self.version)

        self.llm = create_llm_client(self.config["model"])
        self.tools = load_tools(self.config["tools"])
        self.prompts = self.config["prompts"]

    async def run(self, query: str) -> dict:
        """Execute with version tracking."""
        result = await self._execute(query)

        return {
            **result,
            "agent_version": self.version,
            "config_hash": self.config["hash"]
        }


class AgentConfigStore:
    """Store and retrieve agent configurations."""

    def __init__(self, db):
        self.db = db

    async def save_version(self, config: dict) -> str:
        """Save a new configuration version."""
        version = datetime.now().strftime("%Y%m%d_%H%M%S")
        config["version"] = version
        config["hash"] = hashlib.md5(json.dumps(config, sort_keys=True).encode()).hexdigest()

        await self.db.insert("agent_configs", config)
        return version

    async def get_version(self, version: str) -> dict:
        """Retrieve specific version."""
        return await self.db.find_one("agent_configs", {"version": version})

    async def get_latest(self) -> dict:
        """Get the most recent version."""
        return await self.db.find_one(
            "agent_configs",
            sort=[("version", -1)]
        )

    async def rollback(self, to_version: str):
        """Rollback to a previous version."""
        config = await self.get_version(to_version)
        if config:
            config["rolled_back_at"] = datetime.now().isoformat()
            await self.save_version(config)

A/B Testing Router

Route requests to different agent versions for testing.

class ABTestingAgentRouter:
    """Route requests to different agent versions for A/B testing."""

    def __init__(self, agents: dict[str, 'Agent'], traffic_split: dict[str, float]):
        """
        Args:
            agents: {"control": agent_v1, "treatment": agent_v2}
            traffic_split: {"control": 0.9, "treatment": 0.1}
        """
        self.agents = agents
        self.traffic_split = traffic_split
        self.results = defaultdict(list)

    async def route(self, query: str, user_id: str) -> tuple[str, dict]:
        """Route query to an agent variant and return result."""
        # Deterministic assignment based on user_id
        variant = self._assign_variant(user_id)

        agent = self.agents[variant]
        result = await agent.run(query)

        # Log for analysis
        self.results[variant].append({
            "query": query,
            "result": result,
            "user_id": user_id,
            "timestamp": datetime.now().isoformat()
        })

        return variant, result

    def _assign_variant(self, user_id: str) -> str:
        """Deterministically assign user to variant."""
        hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        normalized = (hash_val % 10000) / 10000

        cumulative = 0
        for variant, split in self.traffic_split.items():
            cumulative += split
            if normalized < cumulative:
                return variant

        return list(self.agents.keys())[0]

    def get_metrics(self) -> dict:
        """Compute metrics for each variant."""
        metrics = {}
        for variant, results in self.results.items():
            metrics[variant] = {
                "count": len(results),
                "success_rate": sum(1 for r in results if r["result"].get("success", True)) / len(results) if results else 0,
                "avg_latency": sum(r["result"].get("latency", 0) for r in results) / len(results) if results else 0
            }
        return metrics

Research Agent Examples

Core Research Agent

ReAct-style research agent with search and note-taking capabilities.

class ResearchAgent:
    """Agent for research tasks with search and analysis capabilities."""

    SYSTEM_PROMPT = """You are a research assistant that thoroughly investigates topics.

For each step, use this format:
Thought: [your reasoning about what to do next]
Action: [tool_name](arguments)

After receiving observations, continue reasoning or provide final answer:
Answer: [your comprehensive response with citations]

Available tools:
- search(query): Search the web for information
- get_document(url): Retrieve and read a specific document
- take_notes(note): Save important information for later synthesis

Research guidelines:
- Search multiple queries to get diverse perspectives
- Verify important claims with multiple sources
- Take notes on key findings as you go
- Cite sources using [Source: url] format
- Be thorough but respect the depth parameter

Current depth: {depth} (quick: 1-2 searches, standard: 3-5, deep: 5-10)"""

    def __init__(self, llm_client, search_tool, document_tool):
        self.llm = llm_client
        self.search = search_tool
        self.document = document_tool
        self.notes = []
        self.sources = []

    def take_notes(self, note: str) -> str:
        """Save a note for later synthesis."""
        self.notes.append({
            "content": note,
            "timestamp": datetime.now().isoformat()
        })
        return f"Note saved. Total notes: {len(self.notes)}"

    async def research(self, topic: str, depth: str = "standard") -> dict:
        """
        Research a topic and produce a summary.

        Args:
            topic: Research topic
            depth: "quick" (1-2 searches), "standard" (3-5), "deep" (5-10)

        Returns:
            dict with summary, sources, and raw notes
        """
        self.notes = []
        self.sources = []

        max_searches = {"quick": 2, "standard": 5, "deep": 10}.get(depth, 5)

        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT.format(depth=depth)},
            {"role": "user", "content": f"Research this topic thoroughly: {topic}"}
        ]

        search_count = 0
        for turn in range(max_searches * 3):  # Allow 3 turns per search
            response = await self.llm.generate(messages)
            messages.append({"role": "assistant", "content": response})

            # Check for final answer
            if "Answer:" in response:
                answer = response.split("Answer:")[-1].strip()
                return {
                    "summary": answer,
                    "sources": self.sources,
                    "notes": self.notes,
                    "search_count": search_count
                }

            # Execute action
            action = self._parse_action(response)
            if action:
                tool_name, args = action
                result = await self._execute_tool(tool_name, args)

                if tool_name == "search":
                    search_count += 1
                    if search_count >= max_searches:
                        result += "\n\n[Note: Search limit reached. Please synthesize your findings.]"

                messages.append({"role": "user", "content": f"Observation: {result}"})

        # Force synthesis if no answer yet
        messages.append({
            "role": "user",
            "content": "Please provide your final Answer now based on what you've learned."
        })
        final_response = await self.llm.generate(messages)

        return {
            "summary": final_response.split("Answer:")[-1].strip() if "Answer:" in final_response else final_response,
            "sources": self.sources,
            "notes": self.notes,
            "search_count": search_count
        }

    async def _execute_tool(self, name: str, args: list) -> str:
        if name == "search":
            result = await self.search(args[0])
            # Track sources
            for r in result.get("results", []):
                self.sources.append({"url": r["url"], "title": r["title"]})
            return json.dumps(result)

        elif name == "get_document":
            try:
                doc = await self.document(args[0])
                self.sources.append({"url": args[0], "title": doc.get("title", "")})
                return doc.get("content", "")[:5000]  # Truncate long docs
            except Exception as e:
                return f"Error retrieving document: {str(e)}"

        elif name == "take_notes":
            return self.take_notes(args[0])

        else:
            return f"Unknown tool: {name}"

    def _parse_action(self, text: str) -> tuple | None:
        import re
        match = re.search(r'Action:\s*(\w+)\((.*?)\)', text, re.DOTALL)
        if match:
            tool_name = match.group(1)
            args_str = match.group(2).strip()
            # Handle quoted strings
            if args_str.startswith('"') and args_str.endswith('"'):
                args = [args_str[1:-1]]
            elif args_str.startswith("'") and args_str.endswith("'"):
                args = [args_str[1:-1]]
            else:
                args = [args_str]
            return tool_name, args
        return None

Fact-Checking Research Agent

Extended research agent with claim verification.

class FactCheckingResearchAgent(ResearchAgent):
    """Research agent with fact-checking capabilities."""

    async def research_with_verification(self, topic: str, depth: str = "standard") -> dict:
        # First, do initial research
        initial = await self.research(topic, depth)

        # Extract claims from summary
        claims = await self._extract_claims(initial["summary"])

        # Verify each claim
        verified_claims = []
        for claim in claims:
            verification = await self._verify_claim(claim)
            verified_claims.append({
                "claim": claim,
                "verified": verification["status"],
                "evidence": verification["evidence"],
                "confidence": verification["confidence"]
            })

        # Update summary with verification info
        verified_summary = await self._integrate_verification(
            initial["summary"],
            verified_claims
        )

        return {
            **initial,
            "summary": verified_summary,
            "claim_verification": verified_claims
        }

    async def _extract_claims(self, summary: str) -> list[str]:
        """Extract factual claims from summary."""
        prompt = f"""Extract specific factual claims from this text.
Only include claims that can be objectively verified.
Output as JSON list of strings.

Text:
{summary}

Claims:"""

        response = await self.llm.generate(prompt)
        return json.loads(response)

    async def _verify_claim(self, claim: str) -> dict:
        """Verify a single claim."""
        # Search for supporting/refuting evidence
        search_result = await self.search(f"fact check: {claim}")

        # Evaluate evidence
        prompt = f"""Evaluate whether this claim is supported by the evidence.

Claim: {claim}

Evidence from search:
{json.dumps(search_result, indent=2)}

Output JSON with:
- status: "verified", "disputed", "unverifiable"
- confidence: 0.0 to 1.0
- evidence: brief explanation of supporting/refuting evidence"""

        response = await self.llm.generate(prompt)
        return json.loads(response)

Comparative Research Agent

Agent for researching and comparing multiple topics.

class ComparativeResearchAgent(ResearchAgent):
    """Research agent for comparing multiple topics."""

    async def compare(self, topics: list[str], aspects: list[str] = None) -> dict:
        """
        Research multiple topics and compare them.

        Args:
            topics: List of topics to compare
            aspects: Optional list of aspects to compare on

        Returns:
            Comparison report with per-topic summaries and analysis
        """
        # Research each topic
        topic_results = {}
        for topic in topics:
            result = await self.research(topic, depth="standard")
            topic_results[topic] = result

        # Determine aspects to compare if not provided
        if not aspects:
            aspects = await self._identify_aspects(topics, topic_results)

        # Build comparison matrix
        comparison = await self._build_comparison(topics, topic_results, aspects)

        # Generate analysis
        analysis = await self._analyze_comparison(topics, comparison)

        return {
            "topics": topic_results,
            "aspects": aspects,
            "comparison": comparison,
            "analysis": analysis
        }

    async def _build_comparison(
        self,
        topics: list[str],
        results: dict,
        aspects: list[str]
    ) -> dict:
        """Build comparison matrix."""
        matrix = {aspect: {} for aspect in aspects}

        for topic, result in results.items():
            prompt = f"""Extract information about these aspects from the research summary.

Summary:
{result['summary']}

Aspects to extract: {aspects}

Output JSON dict mapping aspect to finding (or "not mentioned")."""

            response = await self.llm.generate(prompt)
            findings = json.loads(response)

            for aspect, finding in findings.items():
                if aspect in matrix:
                    matrix[aspect][topic] = finding

        return matrix

    async def _analyze_comparison(self, topics: list[str], comparison: dict) -> str:
        """Generate comparative analysis."""
        prompt = f"""Analyze this comparison of {', '.join(topics)}.

Comparison data:
{json.dumps(comparison, indent=2)}

Provide:
1. Key similarities between the topics
2. Key differences
3. Notable findings or surprising results
4. Recommendations based on the comparison"""

        return await self.llm.generate(prompt)

Memory-Enabled Research Agent

Research agent with caching and learning from past research.

class MemoryEnabledResearchAgent(ResearchAgent):
    """Research agent with persistent memory."""

    def __init__(self, llm_client, search_tool, document_tool, memory_store):
        super().__init__(llm_client, search_tool, document_tool)
        self.memory = memory_store

    async def research(self, topic: str, depth: str = "standard") -> dict:
        # Check for relevant past research
        past_research = await self._recall_past_research(topic)

        if past_research:
            # Decide whether to use cached research or do fresh
            should_refresh = await self._should_refresh(topic, past_research)

            if not should_refresh:
                return {
                    **past_research["result"],
                    "from_memory": True,
                    "memory_date": past_research["timestamp"]
                }

        # Do fresh research
        result = await super().research(topic, depth)

        # Store in memory
        await self._store_research(topic, result)

        return {**result, "from_memory": False}

    async def _recall_past_research(self, topic: str) -> dict | None:
        """Search memory for past research on this topic."""
        results = await self.memory.search(
            query=topic,
            k=3,
            filter={"type": "research"}
        )

        if not results:
            return None

        # Check if any are highly relevant
        for r in results:
            if r["score"] > 0.85:  # High similarity threshold
                return r

        return None

    async def _should_refresh(self, topic: str, past_research: dict) -> bool:
        """Decide if we should do fresh research."""
        # Refresh if old
        past_date = datetime.fromisoformat(past_research["timestamp"])
        age_days = (datetime.now() - past_date).days

        if age_days > 7:  # Older than a week
            return True

        # Refresh if topic seems time-sensitive
        time_sensitive_keywords = ["latest", "recent", "current", "new", "2024", "2025", "2026"]
        if any(kw in topic.lower() for kw in time_sensitive_keywords):
            return True

        return False

    async def _store_research(self, topic: str, result: dict):
        """Store research result in memory."""
        await self.memory.add({
            "type": "research",
            "topic": topic,
            "result": result,
            "timestamp": datetime.now().isoformat(),
            "embedding": await self._embed(f"{topic}\n{result['summary'][:1000]}")
        })

Monitoring and Operations

Agent Metrics Collector

Track agent execution metrics for monitoring.

class AgentMetrics:
    """Collect and report agent metrics."""

    def __init__(self, metrics_client):
        self.metrics = metrics_client

    def record_execution(
        self,
        agent_id: str,
        query: str,
        result: str,
        duration_ms: float,
        turns: int,
        tool_calls: int,
        success: bool
    ):
        # Record metrics
        self.metrics.histogram("agent.duration_ms", duration_ms, tags={"agent": agent_id})
        self.metrics.counter("agent.executions", 1, tags={"agent": agent_id, "success": str(success)})
        self.metrics.histogram("agent.turns", turns, tags={"agent": agent_id})
        self.metrics.histogram("agent.tool_calls", tool_calls, tags={"agent": agent_id})

        # Alert on anomalies
        if turns > 15:
            self.metrics.event("agent.high_turns", f"Agent {agent_id} used {turns} turns")

        if duration_ms > 60000:
            self.metrics.event("agent.slow_execution", f"Agent {agent_id} took {duration_ms}ms")

        if not success:
            self.metrics.event("agent.failure", f"Agent {agent_id} failed on: {query[:100]}")

Agent Test Harness

Testing framework for agent evaluation.

class AgentTestHarness:
    """Test harness for agent evaluation."""

    def __init__(self, agent, test_cases: list[dict]):
        self.agent = agent
        self.test_cases = test_cases

    async def run_tests(self) -> dict:
        results = {
            "passed": 0,
            "failed": 0,
            "errors": [],
            "details": []
        }

        for case in self.test_cases:
            try:
                response = await self.agent.run(case["query"])
                passed = self._evaluate(response, case["expected"])

                results["details"].append({
                    "query": case["query"],
                    "response": response,
                    "expected": case["expected"],
                    "passed": passed
                })

                if passed:
                    results["passed"] += 1
                else:
                    results["failed"] += 1

            except Exception as e:
                results["failed"] += 1
                results["errors"].append({
                    "query": case["query"],
                    "error": str(e)
                })

        return results

Stateful Agent

Agent with persistent state across sessions.

class StatefulAgent:
    """Agent with persistent state across sessions."""

    def __init__(self, agent_id: str, state_store):
        self.agent_id = agent_id
        self.state_store = state_store

    async def load_state(self) -> dict:
        """Load agent state from persistent store."""
        state = await self.state_store.get(f"agent:{self.agent_id}")
        return state or {
            "conversation_history": [],
            "tool_results_cache": {},
            "learned_preferences": {},
            "session_count": 0
        }

    async def save_state(self, state: dict):
        """Save agent state to persistent store."""
        await self.state_store.set(
            f"agent:{self.agent_id}",
            state,
            expire=86400 * 30  # 30 day expiry
        )

    async def run(self, query: str) -> str:
        state = await self.load_state()
        state["session_count"] += 1

        try:
            result = await self._execute(query, state)
            state["conversation_history"].append({
                "query": query,
                "response": result,
                "timestamp": datetime.now().isoformat()
            })
            return result
        finally:
            await self.save_state(state)

Audit Logging

Log all agent actions for compliance and debugging.

def log_agent_action(
    agent_id: str,
    action: str,
    args: dict,
    result: str,
    user_id: str
):
    """Log all agent actions for audit trail."""
    entry = {
        "timestamp": datetime.now().isoformat(),
        "agent_id": agent_id,
        "user_id": user_id,
        "action": action,
        "arguments": args,
        "result_summary": result[:500],  # Truncate large results
        "success": "error" not in result.lower()
    }
    audit_logger.info(json.dumps(entry))
    # Also write to durable audit store
    audit_store.append(entry)