[TRACK3_003] Agent-to-Agent Communication
[TRACK3_003] Agent-to-Agent Communication
📚 Agent-to-Agent Communication
[VIDEO-013] Agent-to-Agent Communication
Track: 3 - Multi-Agent Systems Module: 3 Duration: 12 minutes Level requirement: 36 XP reward: 350 XP
---
Scene 1: Beyond the Orchestrator (0:00-1:30)
[Visual]: Agents talking directly to each other [Animation]: Mesh network forming between agents
[Audio/Script]:
"The orchestrator pattern is powerful, but sometimes agents need to talk directly.>
Consider:
- A code agent finds a bug
- It needs the test agent to verify immediately
- Waiting for the orchestrator adds latency>
Direct agent-to-agent (A2A) communication enables faster, more flexible systems.>
Let's learn the protocols."
[Lower third]: "Track 3: Multi-Agent Systems | Level 36"
---
Scene 2: Communication Patterns (1:30-3:30)
[Visual]: Three communication patterns [Animation]: Messages flowing in each pattern
[Audio/Script]:
"Three patterns for agent communication:>
1. Request-Response
Agent A asks, Agent B answers. Simple, synchronous."
[Diagram]:
Agent A Agent B
│ │
│──── Request ────────────►│
│ │
│◄─── Response ───────────│
│ │[Audio/Script]:
"2. Publish-Subscribe
Agents publish events. Interested agents subscribe."
[Diagram]:
┌───────────┐
│ Event Bus │
└───────────┘
↑ ↑ ↑
│ │ │
Agent A Agent B Agent C
(pub) (sub) (sub)[Audio/Script]:
"3. Shared State
Agents read/write to common memory."
[Diagram]:
Agent A Agent B Agent C
│ │ │
↓ ↓ ↓
┌─────────────────────────┐
│ Shared Memory │
└─────────────────────────┘---
Scene 3: Message Protocol (3:30-5:30)
[Visual]: Message structure breakdown [Animation]: Message being constructed
[Audio/Script]:
"Standardized messages are crucial. Let's define a protocol."
[Demo - Message Protocol]:
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from enum import Enum
import uuidclass MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
EVENT = "event"
ERROR = "error"
@dataclass
class AgentMessage:
"""Standard message format for agent communication"""
# Identity
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
correlation_id: Optional[str] = None # Links request/response
# Routing
sender: str = ""
recipient: str = "" # Empty for broadcasts
# Content
message_type: MessageType = MessageType.REQUEST
action: str = "" # What to do
payload: dict = field(default_factory=dict)
# Metadata
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
priority: int = 0 # Higher = more important
ttl: int = 60 # Time to live in seconds
def to_dict(self) -> dict:
return {
"message_id": self.message_id,
"correlation_id": self.correlation_id,
"sender": self.sender,
"recipient": self.recipient,
"message_type": self.message_type.value,
"action": self.action,
"payload": self.payload,
"timestamp": self.timestamp,
"priority": self.priority,
"ttl": self.ttl
}
@classmethod
def from_dict(cls, data: dict) -> "AgentMessage":
data["message_type"] = MessageType(data["message_type"])
return cls(**data)
def create_response(self, payload: dict) -> "AgentMessage":
"""Create a response to this message"""
return AgentMessage(
correlation_id=self.message_id,
sender=self.recipient,
recipient=self.sender,
message_type=MessageType.RESPONSE,
action=f"{self.action}_response",
payload=payload
)
---
Scene 4: Request-Response Pattern (5:30-7:30)
[Visual]: Synchronous communication flow [Animation]: Request going out, response coming back
[Audio/Script]:
"Request-Response is the simplest pattern. Let's implement it."
[Demo - Request Response]:
import asyncio
from typing import Dict, Callable, Awaitableclass MessageBus:
"""Simple message bus for agent communication"""
def __init__(self):
self.handlers: Dict[str, Callable] = {}
self.pending_responses: Dict[str, asyncio.Future] = {}
def register_handler(self, agent_name: str, handler: Callable):
"""Register an agent's message handler"""
self.handlers[agent_name] = handler
async def send(self, message: AgentMessage) -> Optional[AgentMessage]:
"""Send a message and optionally wait for response"""
if message.recipient not in self.handlers:
raise ValueError(f"Unknown recipient: {message.recipient}")
handler = self.handlers[message.recipient]
# For requests, wait for response
if message.message_type == MessageType.REQUEST:
future = asyncio.Future()
self.pending_responses[message.message_id] = future
# Send to handler
await handler(message, self)
# Wait for response (with timeout)
try:
response = await asyncio.wait_for(future, timeout=message.ttl)
return response
except asyncio.TimeoutError:
del self.pending_responses[message.message_id]
raise TimeoutError(f"No response within {message.ttl}s")
else:
# Events don't need response
await handler(message, self)
return None
async def respond(self, response: AgentMessage):
"""Send a response to a pending request"""
if response.correlation_id in self.pending_responses:
future = self.pending_responses.pop(response.correlation_id)
future.set_result(response)
Example agent using the bus
class AnalyzerAgent:
def __init__(self, name: str, bus: MessageBus):
self.name = name
self.bus = bus
bus.register_handler(name, self.handle_message) async def handle_message(self, message: AgentMessage, bus: MessageBus):
"""Handle incoming messages"""
if message.action == "analyze":
# Do analysis
result = await self.analyze(message.payload.get("data", ""))
# Send response
response = message.create_response({"result": result})
await bus.respond(response)
async def analyze(self, data: str) -> dict:
# Actual analysis logic
return {"word_count": len(data.split()), "char_count": len(data)}
async def request_analysis(self, target: str, data: str) -> dict:
"""Request analysis from another agent"""
message = AgentMessage(
sender=self.name,
recipient=target,
action="analyze",
payload={"data": data}
)
response = await self.bus.send(message)
return response.payload
---
Scene 5: Publish-Subscribe Pattern (7:30-9:30)
[Visual]: Event broadcasting [Animation]: One event reaching multiple subscribers
[Audio/Script]:
"Publish-Subscribe decouples agents. Publishers don't need to know subscribers."
[Demo - Pub/Sub]:
from typing import List, Setclass EventBus:
"""Publish-Subscribe event bus"""
def __init__(self):
self.subscriptions: Dict[str, Set[str]] = {} # event_type -> agent_names
self.handlers: Dict[str, Callable] = {}
def subscribe(self, agent_name: str, event_type: str):
"""Subscribe agent to event type"""
if event_type not in self.subscriptions:
self.subscriptions[event_type] = set()
self.subscriptions[event_type].add(agent_name)
def unsubscribe(self, agent_name: str, event_type: str):
"""Unsubscribe agent from event type"""
if event_type in self.subscriptions:
self.subscriptions[event_type].discard(agent_name)
def register_handler(self, agent_name: str, handler: Callable):
"""Register agent's event handler"""
self.handlers[agent_name] = handler
async def publish(self, event: AgentMessage):
"""Publish event to all subscribers"""
event_type = event.action
subscribers = self.subscriptions.get(event_type, set())
# Notify all subscribers concurrently
tasks = []
for agent_name in subscribers:
if agent_name in self.handlers:
handler = self.handlers[agent_name]
tasks.append(handler(event))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
Example usage
class MonitorAgent:
def __init__(self, name: str, event_bus: EventBus):
self.name = name
self.event_bus = event_bus # Subscribe to events
event_bus.subscribe(name, "task_completed")
event_bus.subscribe(name, "error_occurred")
event_bus.register_handler(name, self.handle_event)
async def handle_event(self, event: AgentMessage):
"""Handle subscribed events"""
if event.action == "task_completed":
print(f"[{self.name}] Task completed: {event.payload}")
elif event.action == "error_occurred":
print(f"[{self.name}] Error: {event.payload}")
await self.alert_admin(event.payload)
async def alert_admin(self, error: dict):
# Send alert
pass
class WorkerAgent:
def __init__(self, name: str, event_bus: EventBus):
self.name = name
self.event_bus = event_bus
async def complete_task(self, task_id: str, result: dict):
"""Complete a task and publish event"""
# Publish completion event
event = AgentMessage(
sender=self.name,
message_type=MessageType.EVENT,
action="task_completed",
payload={"task_id": task_id, "result": result}
)
await self.event_bus.publish(event)
---
Scene 6: Shared State Pattern (9:30-11:00)
[Visual]: Shared memory access [Animation]: Multiple agents reading/writing
[Audio/Script]:
"Shared state lets agents collaborate on common data."
[Demo - Shared State]:
import asyncio
from typing import Anyclass SharedState:
"""Thread-safe shared state for agents"""
def __init__(self):
self._state: Dict[str, Any] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._watchers: Dict[str, List[Callable]] = {}
async def get(self, key: str) -> Any:
"""Get value from shared state"""
return self._state.get(key)
async def set(self, key: str, value: Any, notify: bool = True):
"""Set value in shared state"""
# Get or create lock for this key
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
old_value = self._state.get(key)
self._state[key] = value
# Notify watchers
if notify and key in self._watchers:
for callback in self._watchers[key]:
await callback(key, old_value, value)
async def update(self, key: str, updater: Callable[[Any], Any]):
"""Atomically update a value"""
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
old_value = self._state.get(key)
new_value = updater(old_value)
self._state[key] = new_value
return new_value
def watch(self, key: str, callback: Callable):
"""Watch for changes to a key"""
if key not in self._watchers:
self._watchers[key] = []
self._watchers[key].append(callback)
Example: Collaborative document editing
class EditorAgent:
def __init__(self, name: str, shared_state: SharedState):
self.name = name
self.state = shared_state # Watch for document changes
self.state.watch("document", self.on_document_change)
async def on_document_change(self, key: str, old: Any, new: Any):
print(f"[{self.name}] Document updated: {len(new or '')} chars")
async def append_text(self, text: str):
"""Add text to shared document"""
await self.state.update(
"document",
lambda doc: (doc or "") + text
)
async def get_document(self) -> str:
"""Get current document"""
return await self.state.get("document") or ""
---
Scene 7: Combining Patterns (11:00-11:45)
[Visual]: Hybrid architecture [Animation]: All patterns working together
[Audio/Script]:
"Real systems combine patterns. Here's a hybrid approach:"
[Demo - Hybrid]:
class HybridCommunicationSystem:
"""Combines all communication patterns""" def __init__(self):
self.message_bus = MessageBus() # Request-Response
self.event_bus = EventBus() # Pub-Sub
self.shared_state = SharedState() # Shared Memory
def create_agent_context(self, agent_name: str):
"""Create communication context for an agent"""
return AgentContext(
name=agent_name,
message_bus=self.message_bus,
event_bus=self.event_bus,
shared_state=self.shared_state
)
@dataclass
class AgentContext:
"""Communication context for an agent"""
name: str
message_bus: MessageBus
event_bus: EventBus
shared_state: SharedState
async def request(self, target: str, action: str, payload: dict) -> dict:
"""Send request and get response"""
msg = AgentMessage(
sender=self.name, recipient=target,
action=action, payload=payload
)
response = await self.message_bus.send(msg)
return response.payload
async def publish(self, event_type: str, data: dict):
"""Publish event"""
event = AgentMessage(
sender=self.name, message_type=MessageType.EVENT,
action=event_type, payload=data
)
await self.event_bus.publish(event)
async def get_state(self, key: str) -> Any:
"""Get shared state"""
return await self.shared_state.get(key)
async def set_state(self, key: str, value: Any):
"""Set shared state"""
await self.shared_state.set(key, value)
---
Scene 8: Challenge Time (11:45-12:00)
[Visual]: Challenge specification [Animation]: XP reward display
[Audio/Script]:
"Your challenge: Build a communication system with all three patterns.>
Create:
1. MessageBus for request-response
2. EventBus for pub-sub
3. SharedState for collaboration
4. HybridCommunicationSystem combining all>
Test with 3 agents collaborating on a document.>
Complete this for 750 XP and the 'Communication Expert' badge.>
Next: Specialist agents with real capabilities."
---
Post-Video Challenge
Challenge ID: TRACK3_003_CHALLENGE Type: Code + Integration Test Instructions:
Task 1: Implement MessageBus
claude "Create message_bus.py with request-response pattern:
1. AgentMessage dataclass with all fields
2. MessageBus with send, respond, register_handler
3. Timeout handling for requests"Task 2: Implement EventBus
claude "Create event_bus.py with pub-sub pattern:
1. subscribe, unsubscribe, publish methods
2. Concurrent notification of subscribers
3. Event filtering by type"Task 3: Implement SharedState
claude "Create shared_state.py with:
1. Async-safe get/set/update operations
2. Watch mechanism for changes
3. Atomic updates with locks"Task 4: Integration test
claude "Create a test with 3 agents:
1. WriterAgent - adds content to shared document
2. ReviewerAgent - watches changes, publishes review events
3. EditorAgent - responds to review requestsShow all three patterns working together."
Rewards:
- XP: 750 (350 base + 400 challenge)
- Achievement: "Communication Expert"
SEO Metadata
Alt-text: Agent-to-agent communication patterns - request-response, publish-subscribe, shared state. Build communication systems for multi-agent AI.
Tags: A2A communication, message bus, event bus, shared state, agent messaging, pub-sub pattern
Keywords: agent to agent communication, ai message protocol, pub sub agents, shared state ai, agent communication patterns