📚 Agent-to-Agent Communication

🎯 Level 36+ ⭐ 750 XP ⏱️ 12 min

[VIDEO-013] Agent-to-Agent Communication

Track: 3 - Multi-Agent Systems Module: 3 Duration: 12 minutes Level requirement: 36 XP reward: 350 XP

---

Scene 1: Beyond the Orchestrator (0:00-1:30)

[Visual]: Agents talking directly to each other [Animation]: Mesh network forming between agents

[Audio/Script]:

"The orchestrator pattern is powerful, but sometimes agents need to talk directly.
>
Consider:
- A code agent finds a bug
- It needs the test agent to verify immediately
- Waiting for the orchestrator adds latency
>
Direct agent-to-agent (A2A) communication enables faster, more flexible systems.
>
Let's learn the protocols."

[Lower third]: "Track 3: Multi-Agent Systems | Level 36"

---

Scene 2: Communication Patterns (1:30-3:30)

[Visual]: Three communication patterns [Animation]: Messages flowing in each pattern

[Audio/Script]:

"Three patterns for agent communication:
>
1. Request-Response
Agent A asks, Agent B answers. Simple, synchronous."

[Diagram]:

Agent A                    Agent B
   │                          │
   │──── Request ────────────►│
   │                          │
   │◄─── Response ───────────│
   │                          │

[Audio/Script]:

"2. Publish-Subscribe
Agents publish events. Interested agents subscribe."

[Diagram]:

         ┌───────────┐
         │ Event Bus │
         └───────────┘
          ↑    ↑    ↑
          │    │    │
     Agent A  Agent B  Agent C
     (pub)    (sub)    (sub)

[Audio/Script]:

"3. Shared State
Agents read/write to common memory."

[Diagram]:

     Agent A    Agent B    Agent C
        │          │          │
        ↓          ↓          ↓
     ┌─────────────────────────┐
     │     Shared Memory       │
     └─────────────────────────┘

---

Scene 3: Message Protocol (3:30-5:30)

[Visual]: Message structure breakdown [Animation]: Message being constructed

[Audio/Script]:

"Standardized messages are crucial. Let's define a protocol."

[Demo - Message Protocol]:

from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
from enum import Enum
import uuid

class MessageType(Enum): REQUEST = "request" RESPONSE = "response" EVENT = "event" ERROR = "error"

@dataclass class AgentMessage: """Standard message format for agent communication"""

# Identity message_id: str = field(default_factory=lambda: str(uuid.uuid4())) correlation_id: Optional[str] = None # Links request/response

# Routing sender: str = "" recipient: str = "" # Empty for broadcasts

# Content message_type: MessageType = MessageType.REQUEST action: str = "" # What to do payload: dict = field(default_factory=dict)

# Metadata timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) priority: int = 0 # Higher = more important ttl: int = 60 # Time to live in seconds

def to_dict(self) -> dict: return { "message_id": self.message_id, "correlation_id": self.correlation_id, "sender": self.sender, "recipient": self.recipient, "message_type": self.message_type.value, "action": self.action, "payload": self.payload, "timestamp": self.timestamp, "priority": self.priority, "ttl": self.ttl }

@classmethod def from_dict(cls, data: dict) -> "AgentMessage": data["message_type"] = MessageType(data["message_type"]) return cls(**data)

def create_response(self, payload: dict) -> "AgentMessage": """Create a response to this message""" return AgentMessage( correlation_id=self.message_id, sender=self.recipient, recipient=self.sender, message_type=MessageType.RESPONSE, action=f"{self.action}_response", payload=payload )

---

Scene 4: Request-Response Pattern (5:30-7:30)

[Visual]: Synchronous communication flow [Animation]: Request going out, response coming back

[Audio/Script]:

"Request-Response is the simplest pattern. Let's implement it."

[Demo - Request Response]:

import asyncio
from typing import Dict, Callable, Awaitable

class MessageBus: """Simple message bus for agent communication"""

def __init__(self): self.handlers: Dict[str, Callable] = {} self.pending_responses: Dict[str, asyncio.Future] = {}

def register_handler(self, agent_name: str, handler: Callable): """Register an agent's message handler""" self.handlers[agent_name] = handler

async def send(self, message: AgentMessage) -> Optional[AgentMessage]: """Send a message and optionally wait for response"""

if message.recipient not in self.handlers: raise ValueError(f"Unknown recipient: {message.recipient}")

handler = self.handlers[message.recipient]

# For requests, wait for response if message.message_type == MessageType.REQUEST: future = asyncio.Future() self.pending_responses[message.message_id] = future

# Send to handler await handler(message, self)

# Wait for response (with timeout) try: response = await asyncio.wait_for(future, timeout=message.ttl) return response except asyncio.TimeoutError: del self.pending_responses[message.message_id] raise TimeoutError(f"No response within {message.ttl}s") else: # Events don't need response await handler(message, self) return None

async def respond(self, response: AgentMessage): """Send a response to a pending request""" if response.correlation_id in self.pending_responses: future = self.pending_responses.pop(response.correlation_id) future.set_result(response)

Example agent using the bus

class AnalyzerAgent: def __init__(self, name: str, bus: MessageBus): self.name = name self.bus = bus bus.register_handler(name, self.handle_message)

async def handle_message(self, message: AgentMessage, bus: MessageBus): """Handle incoming messages""" if message.action == "analyze": # Do analysis result = await self.analyze(message.payload.get("data", ""))

# Send response response = message.create_response({"result": result}) await bus.respond(response)

async def analyze(self, data: str) -> dict: # Actual analysis logic return {"word_count": len(data.split()), "char_count": len(data)}

async def request_analysis(self, target: str, data: str) -> dict: """Request analysis from another agent""" message = AgentMessage( sender=self.name, recipient=target, action="analyze", payload={"data": data} ) response = await self.bus.send(message) return response.payload

---

Scene 5: Publish-Subscribe Pattern (7:30-9:30)

[Visual]: Event broadcasting [Animation]: One event reaching multiple subscribers

[Audio/Script]:

"Publish-Subscribe decouples agents. Publishers don't need to know subscribers."

[Demo - Pub/Sub]:

from typing import List, Set

class EventBus: """Publish-Subscribe event bus"""

def __init__(self): self.subscriptions: Dict[str, Set[str]] = {} # event_type -> agent_names self.handlers: Dict[str, Callable] = {}

def subscribe(self, agent_name: str, event_type: str): """Subscribe agent to event type""" if event_type not in self.subscriptions: self.subscriptions[event_type] = set() self.subscriptions[event_type].add(agent_name)

def unsubscribe(self, agent_name: str, event_type: str): """Unsubscribe agent from event type""" if event_type in self.subscriptions: self.subscriptions[event_type].discard(agent_name)

def register_handler(self, agent_name: str, handler: Callable): """Register agent's event handler""" self.handlers[agent_name] = handler

async def publish(self, event: AgentMessage): """Publish event to all subscribers""" event_type = event.action subscribers = self.subscriptions.get(event_type, set())

# Notify all subscribers concurrently tasks = [] for agent_name in subscribers: if agent_name in self.handlers: handler = self.handlers[agent_name] tasks.append(handler(event))

if tasks: await asyncio.gather(*tasks, return_exceptions=True)

Example usage

class MonitorAgent: def __init__(self, name: str, event_bus: EventBus): self.name = name self.event_bus = event_bus

# Subscribe to events event_bus.subscribe(name, "task_completed") event_bus.subscribe(name, "error_occurred") event_bus.register_handler(name, self.handle_event)

async def handle_event(self, event: AgentMessage): """Handle subscribed events""" if event.action == "task_completed": print(f"[{self.name}] Task completed: {event.payload}") elif event.action == "error_occurred": print(f"[{self.name}] Error: {event.payload}") await self.alert_admin(event.payload)

async def alert_admin(self, error: dict): # Send alert pass

class WorkerAgent: def __init__(self, name: str, event_bus: EventBus): self.name = name self.event_bus = event_bus

async def complete_task(self, task_id: str, result: dict): """Complete a task and publish event""" # Publish completion event event = AgentMessage( sender=self.name, message_type=MessageType.EVENT, action="task_completed", payload={"task_id": task_id, "result": result} ) await self.event_bus.publish(event)

---

Scene 6: Shared State Pattern (9:30-11:00)

[Visual]: Shared memory access [Animation]: Multiple agents reading/writing

[Audio/Script]:

"Shared state lets agents collaborate on common data."

[Demo - Shared State]:

import asyncio
from typing import Any

class SharedState: """Thread-safe shared state for agents"""

def __init__(self): self._state: Dict[str, Any] = {} self._locks: Dict[str, asyncio.Lock] = {} self._watchers: Dict[str, List[Callable]] = {}

async def get(self, key: str) -> Any: """Get value from shared state""" return self._state.get(key)

async def set(self, key: str, value: Any, notify: bool = True): """Set value in shared state""" # Get or create lock for this key if key not in self._locks: self._locks[key] = asyncio.Lock()

async with self._locks[key]: old_value = self._state.get(key) self._state[key] = value

# Notify watchers if notify and key in self._watchers: for callback in self._watchers[key]: await callback(key, old_value, value)

async def update(self, key: str, updater: Callable[[Any], Any]): """Atomically update a value""" if key not in self._locks: self._locks[key] = asyncio.Lock()

async with self._locks[key]: old_value = self._state.get(key) new_value = updater(old_value) self._state[key] = new_value return new_value

def watch(self, key: str, callback: Callable): """Watch for changes to a key""" if key not in self._watchers: self._watchers[key] = [] self._watchers[key].append(callback)

Example: Collaborative document editing

class EditorAgent: def __init__(self, name: str, shared_state: SharedState): self.name = name self.state = shared_state

# Watch for document changes self.state.watch("document", self.on_document_change)

async def on_document_change(self, key: str, old: Any, new: Any): print(f"[{self.name}] Document updated: {len(new or '')} chars")

async def append_text(self, text: str): """Add text to shared document""" await self.state.update( "document", lambda doc: (doc or "") + text )

async def get_document(self) -> str: """Get current document""" return await self.state.get("document") or ""

---

Scene 7: Combining Patterns (11:00-11:45)

[Visual]: Hybrid architecture [Animation]: All patterns working together

[Audio/Script]:

"Real systems combine patterns. Here's a hybrid approach:"

[Demo - Hybrid]:

class HybridCommunicationSystem:
    """Combines all communication patterns"""

def __init__(self): self.message_bus = MessageBus() # Request-Response self.event_bus = EventBus() # Pub-Sub self.shared_state = SharedState() # Shared Memory

def create_agent_context(self, agent_name: str): """Create communication context for an agent""" return AgentContext( name=agent_name, message_bus=self.message_bus, event_bus=self.event_bus, shared_state=self.shared_state )

@dataclass class AgentContext: """Communication context for an agent""" name: str message_bus: MessageBus event_bus: EventBus shared_state: SharedState

async def request(self, target: str, action: str, payload: dict) -> dict: """Send request and get response""" msg = AgentMessage( sender=self.name, recipient=target, action=action, payload=payload ) response = await self.message_bus.send(msg) return response.payload

async def publish(self, event_type: str, data: dict): """Publish event""" event = AgentMessage( sender=self.name, message_type=MessageType.EVENT, action=event_type, payload=data ) await self.event_bus.publish(event)

async def get_state(self, key: str) -> Any: """Get shared state""" return await self.shared_state.get(key)

async def set_state(self, key: str, value: Any): """Set shared state""" await self.shared_state.set(key, value)

---

Scene 8: Challenge Time (11:45-12:00)

[Visual]: Challenge specification [Animation]: XP reward display

[Audio/Script]:

"Your challenge: Build a communication system with all three patterns.
>
Create:
1. MessageBus for request-response
2. EventBus for pub-sub
3. SharedState for collaboration
4. HybridCommunicationSystem combining all
>
Test with 3 agents collaborating on a document.
>
Complete this for 750 XP and the 'Communication Expert' badge.
>
Next: Specialist agents with real capabilities."

---

Post-Video Challenge

Challenge ID: TRACK3_003_CHALLENGE Type: Code + Integration Test Instructions:

Task 1: Implement MessageBus

claude "Create message_bus.py with request-response pattern:
1. AgentMessage dataclass with all fields
2. MessageBus with send, respond, register_handler
3. Timeout handling for requests"

Task 2: Implement EventBus

claude "Create event_bus.py with pub-sub pattern:
1. subscribe, unsubscribe, publish methods
2. Concurrent notification of subscribers
3. Event filtering by type"

Task 3: Implement SharedState

claude "Create shared_state.py with:
1. Async-safe get/set/update operations
2. Watch mechanism for changes
3. Atomic updates with locks"

Task 4: Integration test

claude "Create a test with 3 agents:
1. WriterAgent - adds content to shared document
2. ReviewerAgent - watches changes, publishes review events
3. EditorAgent - responds to review requests

Show all three patterns working together."

Rewards:

  • XP: 750 (350 base + 400 challenge)
  • Achievement: "Communication Expert"
---

SEO Metadata

Alt-text: Agent-to-agent communication patterns - request-response, publish-subscribe, shared state. Build communication systems for multi-agent AI.

Tags: A2A communication, message bus, event bus, shared state, agent messaging, pub-sub pattern

Keywords: agent to agent communication, ai message protocol, pub sub agents, shared state ai, agent communication patterns

Последнее изменение: среда, 10 декабря 2025, 01:05