[TRACK3_002] Building the Orchestrator
[TRACK3_002] Building the Orchestrator
📚 Building the Orchestrator
[VIDEO-012] Building the Orchestrator
Track: 3 - Multi-Agent Systems Module: 2 Duration: 12 minutes Level requirement: 33 XP reward: 350 XP
---
Scene 1: The Brain of Multi-Agent Systems (0:00-1:30)
[Visual]: Orchestra conductor analogy [Animation]: Conductor coordinating musicians
[Audio/Script]:
"In an orchestra, musicians are specialists. The conductor coordinates.>
In multi-agent systems, agents are specialists. The orchestrator coordinates.>
A great orchestrator:
- Understands the overall goal
- Knows each agent's capabilities
- Plans the execution strategy
- Handles failures gracefully
- Synthesizes final results>
Let's build one."
[Lower third]: "Track 3: Multi-Agent Systems | Level 33"
---
Scene 2: Orchestrator Architecture (1:30-3:30)
[Visual]: Orchestrator component diagram [Animation]: Each component lighting up
[Audio/Script]:
"A production orchestrator has four components:>
1. Agent Registry - Know your agents
2. Planner - Break tasks into steps
3. Executor - Run the plan
4. Synthesizer - Combine results"
[Demo - Core Structure]:
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod@dataclass
class AgentCapability:
"""What an agent can do"""
name: str
description: str
input_schema: dict
output_schema: dict
class BaseAgent(ABC):
"""Base class for all agents"""
@property
@abstractmethod
def capabilities(self) -> List[AgentCapability]:
pass
@abstractmethod
async def execute(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]:
pass
@dataclass
class PlanStep:
"""A single step in execution plan"""
step_id: str
agent_name: str
task: str
depends_on: List[str] = None # Step IDs this depends on
priority: int = 0
@dataclass
class ExecutionPlan:
"""Complete execution plan"""
goal: str
steps: List[PlanStep]
estimated_time: Optional[int] = None
---
Scene 3: Agent Registry (3:30-5:30)
[Visual]: Registry database [Animation]: Agents registering capabilities
[Audio/Script]:
"First, the orchestrator needs to know its agents.>
The registry stores:
- Agent names
- What they can do
- Input/output formats
- Performance characteristics"
[Demo - Agent Registry]:
class AgentRegistry:
"""Registry of available agents and their capabilities""" def __init__(self):
self.agents: Dict[str, BaseAgent] = {}
self.capabilities: Dict[str, List[AgentCapability]] = {}
def register(self, name: str, agent: BaseAgent):
"""Register an agent"""
self.agents[name] = agent
self.capabilities[name] = agent.capabilities
print(f"Registered agent: {name} with {len(agent.capabilities)} capabilities")
def get_agent(self, name: str) -> Optional[BaseAgent]:
"""Get agent by name"""
return self.agents.get(name)
def find_capable_agent(self, required_capability: str) -> Optional[str]:
"""Find an agent that can handle a capability"""
for agent_name, caps in self.capabilities.items():
for cap in caps:
if required_capability.lower() in cap.name.lower():
return agent_name
return None
def list_all_capabilities(self) -> List[str]:
"""List all available capabilities"""
all_caps = []
for caps in self.capabilities.values():
all_caps.extend([c.name for c in caps])
return all_caps
Example usage
registry = AgentRegistry()
registry.register("researcher", ResearchAgent())
registry.register("writer", WriterAgent())
registry.register("analyst", AnalysisAgent())Find agent for task
agent_name = registry.find_capable_agent("research")
Returns: "researcher"
---
Scene 4: The Planner (5:30-7:30)
[Visual]: Task decomposition visualization [Animation]: Complex task breaking into steps
[Audio/Script]:
"The planner breaks complex tasks into executable steps.>
Good planning:
- Identifies subtasks
- Assigns to appropriate agents
- Orders by dependencies
- Estimates effort"
[Demo - Planner]:
class Planner:
"""Plans task execution""" def __init__(self, registry: AgentRegistry):
self.registry = registry
async def create_plan(self, goal: str) -> ExecutionPlan:
"""Create execution plan for a goal"""
# Analyze the goal to identify required capabilities
required_steps = await self._analyze_goal(goal)
# Map steps to agents
plan_steps = []
for i, step_info in enumerate(required_steps):
agent_name = self.registry.find_capable_agent(step_info["capability"])
if not agent_name:
raise ValueError(f"No agent can handle: {step_info['capability']}")
plan_steps.append(PlanStep(
step_id=f"step_{i}",
agent_name=agent_name,
task=step_info["task"],
depends_on=step_info.get("depends_on", []),
priority=step_info.get("priority", i)
))
return ExecutionPlan(goal=goal, steps=plan_steps)
async def _analyze_goal(self, goal: str) -> List[dict]:
"""Analyze goal and return required steps"""
# In production, this would use Claude to analyze the goal
# For now, simple keyword matching
steps = []
# Check for research needs
if any(word in goal.lower() for word in ["research", "find", "search", "learn about"]):
steps.append({
"capability": "research",
"task": f"Research: {goal}",
"priority": 0
})
# Check for analysis needs
if any(word in goal.lower() for word in ["analyze", "compare", "evaluate"]):
steps.append({
"capability": "analysis",
"task": f"Analyze findings",
"depends_on": ["step_0"] if steps else [],
"priority": 1
})
# Check for writing needs
if any(word in goal.lower() for word in ["write", "create", "generate", "report"]):
steps.append({
"capability": "writing",
"task": f"Create written output",
"depends_on": [f"step_{len(steps)-1}"] if steps else [],
"priority": 2
})
# Default: at least do research
if not steps:
steps.append({
"capability": "research",
"task": goal,
"priority": 0
})
return steps
---
Scene 5: The Executor (7:30-9:30)
[Visual]: Execution flow diagram [Animation]: Steps executing with progress
[Audio/Script]:
"The executor runs the plan, handling dependencies and failures."
[Demo - Executor]:
import asyncio
from typing import Dict, Anyclass Executor:
"""Executes plans step by step"""
def __init__(self, registry: AgentRegistry):
self.registry = registry
async def execute(self, plan: ExecutionPlan) -> Dict[str, Any]:
"""Execute a plan and return results"""
results = {}
completed_steps = set()
# Sort steps by priority and dependencies
pending_steps = list(plan.steps)
while pending_steps:
# Find steps that can run now (dependencies satisfied)
ready_steps = [
step for step in pending_steps
if not step.depends_on or all(d in completed_steps for d in step.depends_on)
]
if not ready_steps:
raise RuntimeError("Deadlock: No steps can execute")
# Execute ready steps (could be parallel)
for step in ready_steps:
print(f"Executing {step.step_id}: {step.task}")
try:
agent = self.registry.get_agent(step.agent_name)
if not agent:
raise ValueError(f"Agent not found: {step.agent_name}")
# Build context from previous results
context = {
"previous_results": {
dep: results[dep]
for dep in (step.depends_on or [])
if dep in results
},
"goal": plan.goal
}
# Execute agent
result = await agent.execute(step.task, context)
results[step.step_id] = result
completed_steps.add(step.step_id)
print(f"Completed {step.step_id}")
except Exception as e:
print(f"Failed {step.step_id}: {e}")
results[step.step_id] = {"error": str(e)}
completed_steps.add(step.step_id) # Mark as done (failed)
pending_steps.remove(step)
return results
async def execute_parallel(self, plan: ExecutionPlan) -> Dict[str, Any]:
"""Execute independent steps in parallel"""
results = {}
completed_steps = set()
pending_steps = list(plan.steps)
while pending_steps:
ready_steps = [
step for step in pending_steps
if not step.depends_on or all(d in completed_steps for d in step.depends_on)
]
if not ready_steps:
raise RuntimeError("Deadlock detected")
# Run all ready steps concurrently
tasks = []
for step in ready_steps:
agent = self.registry.get_agent(step.agent_name)
context = {"previous_results": {d: results[d] for d in (step.depends_on or []) if d in results}}
tasks.append(self._execute_step(step, agent, context))
# Wait for all concurrent tasks
step_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for step, result in zip(ready_steps, step_results):
if isinstance(result, Exception):
results[step.step_id] = {"error": str(result)}
else:
results[step.step_id] = result
completed_steps.add(step.step_id)
pending_steps.remove(step)
return results
async def _execute_step(self, step: PlanStep, agent: BaseAgent, context: dict):
"""Execute a single step"""
return await agent.execute(step.task, context)
---
Scene 6: The Synthesizer (9:30-11:00)
[Visual]: Results merging into final output [Animation]: Multiple outputs combining
[Audio/Script]:
"Finally, the synthesizer combines results into a coherent output."
[Demo - Synthesizer]:
class Synthesizer:
"""Synthesizes results from multiple agents""" async def synthesize(self, plan: ExecutionPlan, results: Dict[str, Any]) -> str:
"""Combine all results into final output"""
# Filter out errors
successful_results = {
k: v for k, v in results.items()
if not isinstance(v, dict) or "error" not in v
}
failed_steps = [
k for k, v in results.items()
if isinstance(v, dict) and "error" in v
]
# Build synthesis prompt
synthesis_parts = [f"Goal: {plan.goal}\n"]
for step in plan.steps:
if step.step_id in successful_results:
result = successful_results[step.step_id]
synthesis_parts.append(f"\n## {step.task}\n{self._format_result(result)}")
if failed_steps:
synthesis_parts.append(f"\n\nNote: {len(failed_steps)} steps encountered errors")
return "\n".join(synthesis_parts)
def _format_result(self, result: Any) -> str:
"""Format a result for output"""
if isinstance(result, dict):
if "summary" in result:
return result["summary"]
if "content" in result:
return result["content"]
return str(result)
return str(result)
---
Scene 7: Complete Orchestrator (11:00-11:45)
[Visual]: All components working together [Animation]: Full orchestration flow
[Audio/Script]:
"Now let's put it all together:"
[Demo - Complete Orchestrator]:
class Orchestrator:
"""Complete multi-agent orchestrator""" def __init__(self):
self.registry = AgentRegistry()
self.planner = Planner(self.registry)
self.executor = Executor(self.registry)
self.synthesizer = Synthesizer()
def register_agent(self, name: str, agent: BaseAgent):
"""Register an agent"""
self.registry.register(name, agent)
async def process(self, task: str, parallel: bool = True) -> str:
"""Process a task through the multi-agent system"""
print(f"Processing: {task}")
# 1. Create plan
print("Creating plan...")
plan = await self.planner.create_plan(task)
print(f"Plan has {len(plan.steps)} steps")
# 2. Execute plan
print("Executing plan...")
if parallel:
results = await self.executor.execute_parallel(plan)
else:
results = await self.executor.execute(plan)
# 3. Synthesize results
print("Synthesizing results...")
output = await self.synthesizer.synthesize(plan, results)
return output
Usage
async def main():
orchestrator = Orchestrator() # Register agents
orchestrator.register_agent("researcher", ResearchAgent())
orchestrator.register_agent("analyst", AnalysisAgent())
orchestrator.register_agent("writer", WriterAgent())
# Process a complex task
result = await orchestrator.process(
"Research AI trends in 2025, analyze key patterns, and write a report"
)
print(result)
asyncio.run(main())
---
Scene 8: Challenge Time (11:45-12:00)
[Visual]: Challenge specification [Animation]: XP reward display
[Audio/Script]:
"Your challenge: Build a complete orchestrator.>
Requirements:
1. Agent Registry with 3+ agents
2. Planner that creates multi-step plans
3. Executor with dependency handling
4. Synthesizer for final output>
Test with: 'Research quantum computing, analyze applications, write summary'>
Complete this for 750 XP and prove you can orchestrate agents.>
Next: Agent-to-Agent communication protocols."
---
Post-Video Challenge
Challenge ID: TRACK3_002_CHALLENGE Type: Code + Terminal Instructions:
Task 1: Build the registry and planner
claude "Create agent_registry.py and planner.py with:
1. AgentRegistry with register, get_agent, find_capable_agent
2. Planner with create_plan that analyzes goals
3. PlanStep and ExecutionPlan dataclasses"
Validation: Classes created with proper interfacesTask 2: Build the executor
claude "Create executor.py with:
1. Sequential execution with dependency checking
2. Parallel execution for independent steps
3. Error handling that doesn't stop the pipeline"
Validation: Both execution modes workTask 3: Build the synthesizer
claude "Create synthesizer.py that:
1. Combines results from all steps
2. Formats output cleanly
3. Reports any failed steps"
Validation: Produces coherent outputTask 4: Integration test
claude "Create orchestrator.py that combines all components.
Test with: 'Research climate change solutions, analyze feasibility, write recommendations'"
Validation: Full pipeline produces meaningful outputRewards:
- XP: 750 (350 base + 400 challenge)
- Achievement: "Orchestrator Builder"
SEO Metadata
Alt-text: Building AI orchestrators - agent registry, planner, executor, synthesizer components. Complete guide to coordinating multiple Claude agents.
Tags: AI orchestrator, agent registry, task planner, multi-agent executor, result synthesis, agent coordination
Keywords: build ai orchestrator, multi-agent coordinator, agent registry pattern, task planning ai, agent execution engine