flowchart TB
subgraph Supervisor["SUPERVISOR AGENT"]
direction TB
Plan["Planning<br/>Decompose task"]
Delegate["Delegation<br/>Assign to workers"]
Monitor["Monitoring<br/>Track progress"]
Synthesize["Synthesis<br/>Combine results"]
Plan --> Delegate --> Monitor --> Synthesize
end
subgraph Workers["SPECIALIST WORKERS"]
direction LR
Research["Researcher<br/>Search & synthesize"]
Coder["Coder<br/>Write & analyze code"]
Analyst["Analyst<br/>Data analysis"]
Writer["Writer<br/>Draft documents"]
end
subgraph Tools["TOOL LAYER"]
direction LR
Search[("Search<br/>API")]
Code[("Code<br/>Executor")]
Data[("Data<br/>Store")]
Docs[("Document<br/>Store")]
end
User["User Request"] --> Plan
Delegate --> Research & Coder & Analyst & Writer
Research --> Search
Coder --> Code
Analyst --> Data
Writer --> Docs
Research & Coder & Analyst & Writer --> Monitor
Synthesize --> Response["Final Response"]
style Supervisor fill:#e3f2fd
style Workers fill:#fff3e0
style Tools fill:#f3e5f5
Supervisor-Worker Agent Architecture
"""Supervisor-worker agent pattern for complex multi-step tasks.Illustrative fragment showing orchestration structure."""from enum import Enumfrom typing import Optionalfrom pydantic import BaseModelimport asyncioclass TaskStatus(Enum): PENDING ="pending" IN_PROGRESS ="in_progress" COMPLETED ="completed" FAILED ="failed" NEEDS_HUMAN ="needs_human"class SubTask(BaseModel):id: str description: str assigned_to: str# worker agent type status: TaskStatus dependencies: list[str] = [] result: Optional[str] =None error: Optional[str] =Noneclass SupervisorAgent:""" Coordinates worker agents for complex tasks. Handles planning, delegation, and result synthesis. """ PLANNING_PROMPT ="""You are a task supervisor. Break down this request into subtasks that can be handled by specialist workers.Available workers:- RESEARCHER: Can search documents and synthesize information- CODER: Can write and analyze code- ANALYST: Can perform data analysis and create visualizations- WRITER: Can draft and edit documentsUser request: {request}Output a JSON list of subtasks with:- id: unique identifier- description: what needs to be done- assigned_to: which worker type- dependencies: list of subtask ids that must complete firstRespond with valid JSON only.""" SYNTHESIS_PROMPT ="""You are synthesizing results from multiple workers.Original request: {request}Worker results:{results}Provide a coherent final response that:1. Addresses the original request completely2. Integrates all worker outputs appropriately3. Notes any limitations or caveats4. Suggests follow-up actions if relevant"""def__init__(self, llm_client, workers: dict):self.llm = llm_clientself.workers = workers # worker_type -> WorkerAgentself.max_iterations =10self.task_history = []asyncdef execute(self, request: str) ->str:# Phase 1: Planning plan =awaitself._create_plan(request)# Phase 2: Execute subtasks (respecting dependencies) completed = {}for iteration inrange(self.max_iterations):# Find executable tasks (dependencies met) executable = [ t for t in plan if t.status == TaskStatus.PENDINGandall(d in completed for d in t.dependencies) ]ifnot executable:ifall(t.status == TaskStatus.COMPLETED for t in plan):breakelse:# Stuck - some tasks can't proceedreturnself._handle_stuck_state(plan, request)# Execute in parallel where possible results =await asyncio.gather(*[self._execute_subtask(t) for t in executable ])for task, result inzip(executable, results): task.status = result.status task.result = result.resultif result.status == TaskStatus.COMPLETED: completed[task.id] = result.result# Phase 3: Synthesize resultsreturnawaitself._synthesize(request, plan)asyncdef _execute_subtask(self, task: SubTask) -> SubTask:"""Delegate subtask to appropriate worker.""" worker =self.workers.get(task.assigned_to)ifnot worker: task.status = TaskStatus.FAILED task.error =f"Unknown worker type: {task.assigned_to}"return tasktry: result =await worker.execute( task.description, context=self._get_context_for_task(task) ) task.status = TaskStatus.COMPLETED task.result = resultexcept WorkerError as e: task.status = TaskStatus.FAILED task.error =str(e)except NeedsHumanReview as e: task.status = TaskStatus.NEEDS_HUMAN task.error =str(e)return taskasyncdef _create_plan(self, request: str) ->list[SubTask]:"""Use LLM to decompose request into subtasks.""" response =awaitself.llm.complete(self.PLANNING_PROMPT.format(request=request) )return [SubTask(**t) for t in json.loads(response)]asyncdef _synthesize(self, request: str, plan: list[SubTask]) ->str:"""Combine worker results into final response.""" results ="\n\n".join([f"## {t.assigned_to}: {t.description}\n{t.result}"for t in plan if t.status == TaskStatus.COMPLETED ])returnawaitself.llm.complete(self.SYNTHESIS_PROMPT.format(request=request, results=results) )
Worker Agent Base Class
class WorkerAgent:"""Base class for specialist worker agents."""def__init__(self, llm_client, tools: list=None):self.llm = llm_clientself.tools = tools or []asyncdef execute(self, task: str, context: dict=None) ->str:"""Execute a task and return result."""raiseNotImplementedErrordef _format_context(self, context: dict) ->str:"""Format context for inclusion in prompt."""ifnot context:return""return"\n".join(f"{k}: {v}"for k, v in context.items())class ResearcherAgent(WorkerAgent):"""Searches documents and synthesizes information.""" PROMPT ="""You are a research assistant. Search for and synthesize information to answer this question:{task}Context from previous steps:{context}Use the search tool to find relevant information, then provide a comprehensive answer with citations."""asyncdef execute(self, task: str, context: dict=None) ->str:# Implementation would use RAG/search toolspassclass CoderAgent(WorkerAgent):"""Writes and analyzes code.""" PROMPT ="""You are a coding assistant. Complete this task:{task}Context:{context}Write clean, well-documented code. Include error handling."""asyncdef execute(self, task: str, context: dict=None) ->str:# Implementation would generate and optionally execute codepass
Error Handling Patterns
class AgentError(Exception):"""Base exception for agent errors."""passclass WorkerError(AgentError):"""Worker failed to complete task."""passclass NeedsHumanReview(AgentError):"""Task requires human intervention."""passclass LoopDetected(AgentError):"""Agent appears to be in an infinite loop."""pass# Circuit breaker for agent loops {.unnumbered}class CircuitBreaker:def__init__(self, max_iterations: int=10, similarity_threshold: float=0.9):self.max_iterations = max_iterationsself.similarity_threshold = similarity_thresholdself.history = []def check(self, state: str) ->bool:"""Returns True if should continue, raises if loop detected."""iflen(self.history) >=self.max_iterations:raise LoopDetected(f"Max iterations ({self.max_iterations}) exceeded")for past_state inself.history[-3:]:ifself._similarity(state, past_state) >self.similarity_threshold:raise LoopDetected("Similar state detected, possible loop")self.history.append(state)returnTrue
Usage Example
# Initialize workers {.unnumbered}workers = {"RESEARCHER": ResearcherAgent(llm, tools=[search_tool]),"CODER": CoderAgent(llm, tools=[code_exec_tool]),"ANALYST": AnalystAgent(llm, tools=[data_tool]),"WRITER": WriterAgent(llm),}# Create supervisor {.unnumbered}supervisor = SupervisorAgent(llm, workers)# Execute complex task {.unnumbered}result =await supervisor.execute("Research our competitor's pricing, analyze how it compares to ours, ""and draft a memo with recommendations for the pricing team.")
---number-sections: falseexecute: enabled: false---# Agent Orchestration Patterns {.unnumbered}## Architecture Overview```mermaidflowchart TB subgraph Supervisor["SUPERVISOR AGENT"] direction TB Plan["Planning<br/>Decompose task"] Delegate["Delegation<br/>Assign to workers"] Monitor["Monitoring<br/>Track progress"] Synthesize["Synthesis<br/>Combine results"] Plan --> Delegate --> Monitor --> Synthesize end subgraph Workers["SPECIALIST WORKERS"] direction LR Research["Researcher<br/>Search & synthesize"] Coder["Coder<br/>Write & analyze code"] Analyst["Analyst<br/>Data analysis"] Writer["Writer<br/>Draft documents"] end subgraph Tools["TOOL LAYER"] direction LR Search[("Search<br/>API")] Code[("Code<br/>Executor")] Data[("Data<br/>Store")] Docs[("Document<br/>Store")] end User["User Request"] --> Plan Delegate --> Research & Coder & Analyst & Writer Research --> Search Coder --> Code Analyst --> Data Writer --> Docs Research & Coder & Analyst & Writer --> Monitor Synthesize --> Response["Final Response"] style Supervisor fill:#e3f2fd style Workers fill:#fff3e0 style Tools fill:#f3e5f5```## Supervisor-Worker Agent Architecture```python"""Supervisor-worker agent pattern for complex multi-step tasks.Illustrative fragment showing orchestration structure."""from enum import Enumfrom typing import Optionalfrom pydantic import BaseModelimport asyncioclass TaskStatus(Enum): PENDING ="pending" IN_PROGRESS ="in_progress" COMPLETED ="completed" FAILED ="failed" NEEDS_HUMAN ="needs_human"class SubTask(BaseModel):id: str description: str assigned_to: str# worker agent type status: TaskStatus dependencies: list[str] = [] result: Optional[str] =None error: Optional[str] =Noneclass SupervisorAgent:""" Coordinates worker agents for complex tasks. Handles planning, delegation, and result synthesis. """ PLANNING_PROMPT ="""You are a task supervisor. Break down this request into subtasks that can be handled by specialist workers.Available workers:- RESEARCHER: Can search documents and synthesize information- CODER: Can write and analyze code- ANALYST: Can perform data analysis and create visualizations- WRITER: Can draft and edit documentsUser request: {request}Output a JSON list of subtasks with:- id: unique identifier- description: what needs to be done- assigned_to: which worker type- dependencies: list of subtask ids that must complete firstRespond with valid JSON only.""" SYNTHESIS_PROMPT ="""You are synthesizing results from multiple workers.Original request: {request}Worker results:{results}Provide a coherent final response that:1. Addresses the original request completely2. Integrates all worker outputs appropriately3. Notes any limitations or caveats4. Suggests follow-up actions if relevant"""def__init__(self, llm_client, workers: dict):self.llm = llm_clientself.workers = workers # worker_type -> WorkerAgentself.max_iterations =10self.task_history = []asyncdef execute(self, request: str) ->str:# Phase 1: Planning plan =awaitself._create_plan(request)# Phase 2: Execute subtasks (respecting dependencies) completed = {}for iteration inrange(self.max_iterations):# Find executable tasks (dependencies met) executable = [ t for t in plan if t.status == TaskStatus.PENDINGandall(d in completed for d in t.dependencies) ]ifnot executable:ifall(t.status == TaskStatus.COMPLETED for t in plan):breakelse:# Stuck - some tasks can't proceedreturnself._handle_stuck_state(plan, request)# Execute in parallel where possible results =await asyncio.gather(*[self._execute_subtask(t) for t in executable ])for task, result inzip(executable, results): task.status = result.status task.result = result.resultif result.status == TaskStatus.COMPLETED: completed[task.id] = result.result# Phase 3: Synthesize resultsreturnawaitself._synthesize(request, plan)asyncdef _execute_subtask(self, task: SubTask) -> SubTask:"""Delegate subtask to appropriate worker.""" worker =self.workers.get(task.assigned_to)ifnot worker: task.status = TaskStatus.FAILED task.error =f"Unknown worker type: {task.assigned_to}"return tasktry: result =await worker.execute( task.description, context=self._get_context_for_task(task) ) task.status = TaskStatus.COMPLETED task.result = resultexcept WorkerError as e: task.status = TaskStatus.FAILED task.error =str(e)except NeedsHumanReview as e: task.status = TaskStatus.NEEDS_HUMAN task.error =str(e)return taskasyncdef _create_plan(self, request: str) ->list[SubTask]:"""Use LLM to decompose request into subtasks.""" response =awaitself.llm.complete(self.PLANNING_PROMPT.format(request=request) )return [SubTask(**t) for t in json.loads(response)]asyncdef _synthesize(self, request: str, plan: list[SubTask]) ->str:"""Combine worker results into final response.""" results ="\n\n".join([f"## {t.assigned_to}: {t.description}\n{t.result}"for t in plan if t.status == TaskStatus.COMPLETED ])returnawaitself.llm.complete(self.SYNTHESIS_PROMPT.format(request=request, results=results) )```## Worker Agent Base Class```pythonclass WorkerAgent:"""Base class for specialist worker agents."""def__init__(self, llm_client, tools: list=None):self.llm = llm_clientself.tools = tools or []asyncdef execute(self, task: str, context: dict=None) ->str:"""Execute a task and return result."""raiseNotImplementedErrordef _format_context(self, context: dict) ->str:"""Format context for inclusion in prompt."""ifnot context:return""return"\n".join(f"{k}: {v}"for k, v in context.items())class ResearcherAgent(WorkerAgent):"""Searches documents and synthesizes information.""" PROMPT ="""You are a research assistant. Search for and synthesize information to answer this question:{task}Context from previous steps:{context}Use the search tool to find relevant information, then provide a comprehensive answer with citations."""asyncdef execute(self, task: str, context: dict=None) ->str:# Implementation would use RAG/search toolspassclass CoderAgent(WorkerAgent):"""Writes and analyzes code.""" PROMPT ="""You are a coding assistant. Complete this task:{task}Context:{context}Write clean, well-documented code. Include error handling."""asyncdef execute(self, task: str, context: dict=None) ->str:# Implementation would generate and optionally execute codepass```## Error Handling Patterns```pythonclass AgentError(Exception):"""Base exception for agent errors."""passclass WorkerError(AgentError):"""Worker failed to complete task."""passclass NeedsHumanReview(AgentError):"""Task requires human intervention."""passclass LoopDetected(AgentError):"""Agent appears to be in an infinite loop."""pass# Circuit breaker for agent loops {.unnumbered}class CircuitBreaker:def__init__(self, max_iterations: int=10, similarity_threshold: float=0.9):self.max_iterations = max_iterationsself.similarity_threshold = similarity_thresholdself.history = []def check(self, state: str) ->bool:"""Returns True if should continue, raises if loop detected."""iflen(self.history) >=self.max_iterations:raise LoopDetected(f"Max iterations ({self.max_iterations}) exceeded")for past_state inself.history[-3:]:ifself._similarity(state, past_state) >self.similarity_threshold:raise LoopDetected("Similar state detected, possible loop")self.history.append(state)returnTrue```## Usage Example```python# Initialize workers {.unnumbered}workers = {"RESEARCHER": ResearcherAgent(llm, tools=[search_tool]),"CODER": CoderAgent(llm, tools=[code_exec_tool]),"ANALYST": AnalystAgent(llm, tools=[data_tool]),"WRITER": WriterAgent(llm),}# Create supervisor {.unnumbered}supervisor = SupervisorAgent(llm, workers)# Execute complex task {.unnumbered}result =await supervisor.execute("Research our competitor's pricing, analyze how it compares to ours, ""and draft a memo with recommendations for the pricing team.")```