📚 Building the Orchestrator

🎯 Level 33+ ⭐ 750 XP ⏱️ 12 min

[VIDEO-012] Building the Orchestrator

Track: 3 - Multi-Agent Systems Module: 2 Duration: 12 minutes Level requirement: 33 XP reward: 350 XP

---

Scene 1: The Brain of Multi-Agent Systems (0:00-1:30)

[Visual]: Orchestra conductor analogy [Animation]: Conductor coordinating musicians

[Audio/Script]:

"In an orchestra, musicians are specialists. The conductor coordinates.
>
In multi-agent systems, agents are specialists. The orchestrator coordinates.
>
A great orchestrator:
- Understands the overall goal
- Knows each agent's capabilities
- Plans the execution strategy
- Handles failures gracefully
- Synthesizes final results
>
Let's build one."

[Lower third]: "Track 3: Multi-Agent Systems | Level 33"

---

Scene 2: Orchestrator Architecture (1:30-3:30)

[Visual]: Orchestrator component diagram [Animation]: Each component lighting up

[Audio/Script]:

"A production orchestrator has four components:
>
1. Agent Registry - Know your agents
2. Planner - Break tasks into steps
3. Executor - Run the plan
4. Synthesizer - Combine results"

[Demo - Core Structure]:

from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod

@dataclass class AgentCapability: """What an agent can do""" name: str description: str input_schema: dict output_schema: dict

class BaseAgent(ABC): """Base class for all agents"""

@property @abstractmethod def capabilities(self) -> List[AgentCapability]: pass

@abstractmethod async def execute(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]: pass

@dataclass class PlanStep: """A single step in execution plan""" step_id: str agent_name: str task: str depends_on: List[str] = None # Step IDs this depends on priority: int = 0

@dataclass class ExecutionPlan: """Complete execution plan""" goal: str steps: List[PlanStep] estimated_time: Optional[int] = None

---

Scene 3: Agent Registry (3:30-5:30)

[Visual]: Registry database [Animation]: Agents registering capabilities

[Audio/Script]:

"First, the orchestrator needs to know its agents.
>
The registry stores:
- Agent names
- What they can do
- Input/output formats
- Performance characteristics"

[Demo - Agent Registry]:

class AgentRegistry:
    """Registry of available agents and their capabilities"""

def __init__(self): self.agents: Dict[str, BaseAgent] = {} self.capabilities: Dict[str, List[AgentCapability]] = {}

def register(self, name: str, agent: BaseAgent): """Register an agent""" self.agents[name] = agent self.capabilities[name] = agent.capabilities print(f"Registered agent: {name} with {len(agent.capabilities)} capabilities")

def get_agent(self, name: str) -> Optional[BaseAgent]: """Get agent by name""" return self.agents.get(name)

def find_capable_agent(self, required_capability: str) -> Optional[str]: """Find an agent that can handle a capability""" for agent_name, caps in self.capabilities.items(): for cap in caps: if required_capability.lower() in cap.name.lower(): return agent_name return None

def list_all_capabilities(self) -> List[str]: """List all available capabilities""" all_caps = [] for caps in self.capabilities.values(): all_caps.extend([c.name for c in caps]) return all_caps

Example usage

registry = AgentRegistry() registry.register("researcher", ResearchAgent()) registry.register("writer", WriterAgent()) registry.register("analyst", AnalysisAgent())

Find agent for task

agent_name = registry.find_capable_agent("research")

Returns: "researcher"

---

Scene 4: The Planner (5:30-7:30)

[Visual]: Task decomposition visualization [Animation]: Complex task breaking into steps

[Audio/Script]:

"The planner breaks complex tasks into executable steps.
>
Good planning:
- Identifies subtasks
- Assigns to appropriate agents
- Orders by dependencies
- Estimates effort"

[Demo - Planner]:

class Planner:
    """Plans task execution"""

def __init__(self, registry: AgentRegistry): self.registry = registry

async def create_plan(self, goal: str) -> ExecutionPlan: """Create execution plan for a goal"""

# Analyze the goal to identify required capabilities required_steps = await self._analyze_goal(goal)

# Map steps to agents plan_steps = [] for i, step_info in enumerate(required_steps): agent_name = self.registry.find_capable_agent(step_info["capability"])

if not agent_name: raise ValueError(f"No agent can handle: {step_info['capability']}")

plan_steps.append(PlanStep( step_id=f"step_{i}", agent_name=agent_name, task=step_info["task"], depends_on=step_info.get("depends_on", []), priority=step_info.get("priority", i) ))

return ExecutionPlan(goal=goal, steps=plan_steps)

async def _analyze_goal(self, goal: str) -> List[dict]: """Analyze goal and return required steps""" # In production, this would use Claude to analyze the goal # For now, simple keyword matching

steps = []

# Check for research needs if any(word in goal.lower() for word in ["research", "find", "search", "learn about"]): steps.append({ "capability": "research", "task": f"Research: {goal}", "priority": 0 })

# Check for analysis needs if any(word in goal.lower() for word in ["analyze", "compare", "evaluate"]): steps.append({ "capability": "analysis", "task": f"Analyze findings", "depends_on": ["step_0"] if steps else [], "priority": 1 })

# Check for writing needs if any(word in goal.lower() for word in ["write", "create", "generate", "report"]): steps.append({ "capability": "writing", "task": f"Create written output", "depends_on": [f"step_{len(steps)-1}"] if steps else [], "priority": 2 })

# Default: at least do research if not steps: steps.append({ "capability": "research", "task": goal, "priority": 0 })

return steps

---

Scene 5: The Executor (7:30-9:30)

[Visual]: Execution flow diagram [Animation]: Steps executing with progress

[Audio/Script]:

"The executor runs the plan, handling dependencies and failures."

[Demo - Executor]:

import asyncio
from typing import Dict, Any

class Executor: """Executes plans step by step"""

def __init__(self, registry: AgentRegistry): self.registry = registry

async def execute(self, plan: ExecutionPlan) -> Dict[str, Any]: """Execute a plan and return results""" results = {} completed_steps = set()

# Sort steps by priority and dependencies pending_steps = list(plan.steps)

while pending_steps: # Find steps that can run now (dependencies satisfied) ready_steps = [ step for step in pending_steps if not step.depends_on or all(d in completed_steps for d in step.depends_on) ]

if not ready_steps: raise RuntimeError("Deadlock: No steps can execute")

# Execute ready steps (could be parallel) for step in ready_steps: print(f"Executing {step.step_id}: {step.task}")

try: agent = self.registry.get_agent(step.agent_name) if not agent: raise ValueError(f"Agent not found: {step.agent_name}")

# Build context from previous results context = { "previous_results": { dep: results[dep] for dep in (step.depends_on or []) if dep in results }, "goal": plan.goal }

# Execute agent result = await agent.execute(step.task, context) results[step.step_id] = result completed_steps.add(step.step_id) print(f"Completed {step.step_id}")

except Exception as e: print(f"Failed {step.step_id}: {e}") results[step.step_id] = {"error": str(e)} completed_steps.add(step.step_id) # Mark as done (failed)

pending_steps.remove(step)

return results

async def execute_parallel(self, plan: ExecutionPlan) -> Dict[str, Any]: """Execute independent steps in parallel""" results = {} completed_steps = set() pending_steps = list(plan.steps)

while pending_steps: ready_steps = [ step for step in pending_steps if not step.depends_on or all(d in completed_steps for d in step.depends_on) ]

if not ready_steps: raise RuntimeError("Deadlock detected")

# Run all ready steps concurrently tasks = [] for step in ready_steps: agent = self.registry.get_agent(step.agent_name) context = {"previous_results": {d: results[d] for d in (step.depends_on or []) if d in results}} tasks.append(self._execute_step(step, agent, context))

# Wait for all concurrent tasks step_results = await asyncio.gather(*tasks, return_exceptions=True)

# Process results for step, result in zip(ready_steps, step_results): if isinstance(result, Exception): results[step.step_id] = {"error": str(result)} else: results[step.step_id] = result completed_steps.add(step.step_id) pending_steps.remove(step)

return results

async def _execute_step(self, step: PlanStep, agent: BaseAgent, context: dict): """Execute a single step""" return await agent.execute(step.task, context)

---

Scene 6: The Synthesizer (9:30-11:00)

[Visual]: Results merging into final output [Animation]: Multiple outputs combining

[Audio/Script]:

"Finally, the synthesizer combines results into a coherent output."

[Demo - Synthesizer]:

class Synthesizer:
    """Synthesizes results from multiple agents"""

async def synthesize(self, plan: ExecutionPlan, results: Dict[str, Any]) -> str: """Combine all results into final output"""

# Filter out errors successful_results = { k: v for k, v in results.items() if not isinstance(v, dict) or "error" not in v }

failed_steps = [ k for k, v in results.items() if isinstance(v, dict) and "error" in v ]

# Build synthesis prompt synthesis_parts = [f"Goal: {plan.goal}\n"]

for step in plan.steps: if step.step_id in successful_results: result = successful_results[step.step_id] synthesis_parts.append(f"\n## {step.task}\n{self._format_result(result)}")

if failed_steps: synthesis_parts.append(f"\n\nNote: {len(failed_steps)} steps encountered errors")

return "\n".join(synthesis_parts)

def _format_result(self, result: Any) -> str: """Format a result for output""" if isinstance(result, dict): if "summary" in result: return result["summary"] if "content" in result: return result["content"] return str(result) return str(result)

---

Scene 7: Complete Orchestrator (11:00-11:45)

[Visual]: All components working together [Animation]: Full orchestration flow

[Audio/Script]:

"Now let's put it all together:"

[Demo - Complete Orchestrator]:

class Orchestrator:
    """Complete multi-agent orchestrator"""

def __init__(self): self.registry = AgentRegistry() self.planner = Planner(self.registry) self.executor = Executor(self.registry) self.synthesizer = Synthesizer()

def register_agent(self, name: str, agent: BaseAgent): """Register an agent""" self.registry.register(name, agent)

async def process(self, task: str, parallel: bool = True) -> str: """Process a task through the multi-agent system"""

print(f"Processing: {task}")

# 1. Create plan print("Creating plan...") plan = await self.planner.create_plan(task) print(f"Plan has {len(plan.steps)} steps")

# 2. Execute plan print("Executing plan...") if parallel: results = await self.executor.execute_parallel(plan) else: results = await self.executor.execute(plan)

# 3. Synthesize results print("Synthesizing results...") output = await self.synthesizer.synthesize(plan, results)

return output

Usage

async def main(): orchestrator = Orchestrator()

# Register agents orchestrator.register_agent("researcher", ResearchAgent()) orchestrator.register_agent("analyst", AnalysisAgent()) orchestrator.register_agent("writer", WriterAgent())

# Process a complex task result = await orchestrator.process( "Research AI trends in 2025, analyze key patterns, and write a report" )

print(result)

asyncio.run(main())

---

Scene 8: Challenge Time (11:45-12:00)

[Visual]: Challenge specification [Animation]: XP reward display

[Audio/Script]:

"Your challenge: Build a complete orchestrator.
>
Requirements:
1. Agent Registry with 3+ agents
2. Planner that creates multi-step plans
3. Executor with dependency handling
4. Synthesizer for final output
>
Test with: 'Research quantum computing, analyze applications, write summary'
>
Complete this for 750 XP and prove you can orchestrate agents.
>
Next: Agent-to-Agent communication protocols."

---

Post-Video Challenge

Challenge ID: TRACK3_002_CHALLENGE Type: Code + Terminal Instructions:

Task 1: Build the registry and planner

claude "Create agent_registry.py and planner.py with:
1. AgentRegistry with register, get_agent, find_capable_agent
2. Planner with create_plan that analyzes goals
3. PlanStep and ExecutionPlan dataclasses"
Validation: Classes created with proper interfaces

Task 2: Build the executor

claude "Create executor.py with:
1. Sequential execution with dependency checking
2. Parallel execution for independent steps
3. Error handling that doesn't stop the pipeline"
Validation: Both execution modes work

Task 3: Build the synthesizer

claude "Create synthesizer.py that:
1. Combines results from all steps
2. Formats output cleanly
3. Reports any failed steps"
Validation: Produces coherent output

Task 4: Integration test

claude "Create orchestrator.py that combines all components.
Test with: 'Research climate change solutions, analyze feasibility, write recommendations'"
Validation: Full pipeline produces meaningful output

Rewards:

  • XP: 750 (350 base + 400 challenge)
  • Achievement: "Orchestrator Builder"
---

SEO Metadata

Alt-text: Building AI orchestrators - agent registry, planner, executor, synthesizer components. Complete guide to coordinating multiple Claude agents.

Tags: AI orchestrator, agent registry, task planner, multi-agent executor, result synthesis, agent coordination

Keywords: build ai orchestrator, multi-agent coordinator, agent registry pattern, task planning ai, agent execution engine

마지막 수정됨: 수요일, 10 12월 2025, 1:05 AM