Agent Orchestration Patterns

Architecture Overview

flowchart TB
    subgraph Supervisor["SUPERVISOR AGENT"]
        direction TB
        Plan["Planning<br/>Decompose task"]
        Delegate["Delegation<br/>Assign to workers"]
        Monitor["Monitoring<br/>Track progress"]
        Synthesize["Synthesis<br/>Combine results"]
        Plan --> Delegate --> Monitor --> Synthesize
    end

    subgraph Workers["SPECIALIST WORKERS"]
        direction LR
        Research["Researcher<br/>Search & synthesize"]
        Coder["Coder<br/>Write & analyze code"]
        Analyst["Analyst<br/>Data analysis"]
        Writer["Writer<br/>Draft documents"]
    end

    subgraph Tools["TOOL LAYER"]
        direction LR
        Search[("Search<br/>API")]
        Code[("Code<br/>Executor")]
        Data[("Data<br/>Store")]
        Docs[("Document<br/>Store")]
    end

    User["User Request"] --> Plan
    Delegate --> Research & Coder & Analyst & Writer
    Research --> Search
    Coder --> Code
    Analyst --> Data
    Writer --> Docs

    Research & Coder & Analyst & Writer --> Monitor
    Synthesize --> Response["Final Response"]

    style Supervisor fill:#e3f2fd
    style Workers fill:#fff3e0
    style Tools fill:#f3e5f5

Supervisor-Worker Agent Architecture

"""
Supervisor-worker agent pattern for complex multi-step tasks.
Illustrative fragment showing orchestration structure.
"""
from enum import Enum
from typing import Optional
from pydantic import BaseModel
import asyncio

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    NEEDS_HUMAN = "needs_human"

class SubTask(BaseModel):
    id: str
    description: str
    assigned_to: str  # worker agent type
    status: TaskStatus
    dependencies: list[str] = []
    result: Optional[str] = None
    error: Optional[str] = None

class SupervisorAgent:
    """
    Coordinates worker agents for complex tasks.
    Handles planning, delegation, and result synthesis.
    """
    
    PLANNING_PROMPT = """You are a task supervisor. Break down this request 
into subtasks that can be handled by specialist workers.

Available workers:
- RESEARCHER: Can search documents and synthesize information
- CODER: Can write and analyze code
- ANALYST: Can perform data analysis and create visualizations
- WRITER: Can draft and edit documents

User request: {request}

Output a JSON list of subtasks with:
- id: unique identifier
- description: what needs to be done
- assigned_to: which worker type
- dependencies: list of subtask ids that must complete first

Respond with valid JSON only."""

    SYNTHESIS_PROMPT = """You are synthesizing results from multiple workers.

Original request: {request}

Worker results:
{results}

Provide a coherent final response that:
1. Addresses the original request completely
2. Integrates all worker outputs appropriately
3. Notes any limitations or caveats
4. Suggests follow-up actions if relevant"""

    def __init__(self, llm_client, workers: dict):
        self.llm = llm_client
        self.workers = workers  # worker_type -> WorkerAgent
        self.max_iterations = 10
        self.task_history = []
    
    async def execute(self, request: str) -> str:
        # Phase 1: Planning
        plan = await self._create_plan(request)
        
        # Phase 2: Execute subtasks (respecting dependencies)
        completed = {}
        for iteration in range(self.max_iterations):
            # Find executable tasks (dependencies met)
            executable = [
                t for t in plan 
                if t.status == TaskStatus.PENDING
                and all(d in completed for d in t.dependencies)
            ]
            
            if not executable:
                if all(t.status == TaskStatus.COMPLETED for t in plan):
                    break
                else:
                    # Stuck - some tasks can't proceed
                    return self._handle_stuck_state(plan, request)
            
            # Execute in parallel where possible
            results = await asyncio.gather(*[
                self._execute_subtask(t) for t in executable
            ])
            
            for task, result in zip(executable, results):
                task.status = result.status
                task.result = result.result
                if result.status == TaskStatus.COMPLETED:
                    completed[task.id] = result.result
        
        # Phase 3: Synthesize results
        return await self._synthesize(request, plan)
    
    async def _execute_subtask(self, task: SubTask) -> SubTask:
        """Delegate subtask to appropriate worker."""
        worker = self.workers.get(task.assigned_to)
        if not worker:
            task.status = TaskStatus.FAILED
            task.error = f"Unknown worker type: {task.assigned_to}"
            return task
        
        try:
            result = await worker.execute(
                task.description,
                context=self._get_context_for_task(task)
            )
            task.status = TaskStatus.COMPLETED
            task.result = result
        except WorkerError as e:
            task.status = TaskStatus.FAILED
            task.error = str(e)
        except NeedsHumanReview as e:
            task.status = TaskStatus.NEEDS_HUMAN
            task.error = str(e)
        
        return task
    
    async def _create_plan(self, request: str) -> list[SubTask]:
        """Use LLM to decompose request into subtasks."""
        response = await self.llm.complete(
            self.PLANNING_PROMPT.format(request=request)
        )
        return [SubTask(**t) for t in json.loads(response)]
    
    async def _synthesize(self, request: str, plan: list[SubTask]) -> str:
        """Combine worker results into final response."""
        results = "\n\n".join([
            f"## {t.assigned_to}: {t.description}\n{t.result}"
            for t in plan if t.status == TaskStatus.COMPLETED
        ])
        
        return await self.llm.complete(
            self.SYNTHESIS_PROMPT.format(request=request, results=results)
        )

Worker Agent Base Class

class WorkerAgent:
    """Base class for specialist worker agents."""
    
    def __init__(self, llm_client, tools: list = None):
        self.llm = llm_client
        self.tools = tools or []
    
    async def execute(self, task: str, context: dict = None) -> str:
        """Execute a task and return result."""
        raise NotImplementedError
    
    def _format_context(self, context: dict) -> str:
        """Format context for inclusion in prompt."""
        if not context:
            return ""
        return "\n".join(f"{k}: {v}" for k, v in context.items())


class ResearcherAgent(WorkerAgent):
    """Searches documents and synthesizes information."""
    
    PROMPT = """You are a research assistant. Search for and synthesize 
information to answer this question:

{task}

Context from previous steps:
{context}

Use the search tool to find relevant information, then provide a 
comprehensive answer with citations."""
    
    async def execute(self, task: str, context: dict = None) -> str:
        # Implementation would use RAG/search tools
        pass


class CoderAgent(WorkerAgent):
    """Writes and analyzes code."""
    
    PROMPT = """You are a coding assistant. Complete this task:

{task}

Context:
{context}

Write clean, well-documented code. Include error handling."""
    
    async def execute(self, task: str, context: dict = None) -> str:
        # Implementation would generate and optionally execute code
        pass

Error Handling Patterns

class AgentError(Exception):
    """Base exception for agent errors."""
    pass

class WorkerError(AgentError):
    """Worker failed to complete task."""
    pass

class NeedsHumanReview(AgentError):
    """Task requires human intervention."""
    pass

class LoopDetected(AgentError):
    """Agent appears to be in an infinite loop."""
    pass

# Circuit breaker for agent loops {.unnumbered}
class CircuitBreaker:
    def __init__(self, max_iterations: int = 10, similarity_threshold: float = 0.9):
        self.max_iterations = max_iterations
        self.similarity_threshold = similarity_threshold
        self.history = []
    
    def check(self, state: str) -> bool:
        """Returns True if should continue, raises if loop detected."""
        if len(self.history) >= self.max_iterations:
            raise LoopDetected(f"Max iterations ({self.max_iterations}) exceeded")
        
        for past_state in self.history[-3:]:
            if self._similarity(state, past_state) > self.similarity_threshold:
                raise LoopDetected("Similar state detected, possible loop")
        
        self.history.append(state)
        return True

Usage Example

# Initialize workers {.unnumbered}
workers = {
    "RESEARCHER": ResearcherAgent(llm, tools=[search_tool]),
    "CODER": CoderAgent(llm, tools=[code_exec_tool]),
    "ANALYST": AnalystAgent(llm, tools=[data_tool]),
    "WRITER": WriterAgent(llm),
}

# Create supervisor {.unnumbered}
supervisor = SupervisorAgent(llm, workers)

# Execute complex task {.unnumbered}
result = await supervisor.execute(
    "Research our competitor's pricing, analyze how it compares to ours, "
    "and draft a memo with recommendations for the pricing team."
)