[TRACK4_003] Monitoring & Observability
[TRACK4_003] Monitoring & Observability
📚 Monitoring & Observability
[VIDEO-018] Monitoring & Observability
Track: 4 - Production Mastery Module: 3 Duration: 14 minutes Level requirement: 48 XP reward: 450 XP
---
Scene 1: Flying Blind vs Full Visibility (0:00-1:30)
[Visual]: Dark cockpit vs illuminated dashboard [Animation]: System metrics appearing on screens
[Audio/Script]:
"Your AI system is in production. It's handling traffic.>
But can you answer these questions?
- How many requests per second?
- What's the average response time?
- Are there errors happening?
- Which agents are overloaded?
- Is performance degrading?>
If you can't answer these, you're flying blind.>
Observability gives you the instruments to see inside your system."
[Lower third]: "Track 4: Production Mastery | Level 48"
---
Scene 2: The Three Pillars (1:30-3:30)
[Visual]: Three pillars supporting a building [Animation]: Each pillar lighting up
[Audio/Script]:
"Observability has three pillars:>
1. Metrics - Numbers over time
How many? How fast? How often?>
2. Logs - Event records
What happened? In what order?>
3. Traces - Request journeys
Where did time go? What path did requests take?>
Together, they answer any question about your system."
[Diagram]:
OBSERVABILITY
│
┌─────────────────┼─────────────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│METRICS│ │ LOGS │ │TRACES │
└───────┘ └───────┘ └───────┘
│ │ │
Numbers Events Journeys
Over Time Records Through System "How many?" "What happened?" "How long did
"How fast?" "When?" each step take?"
---
Scene 3: Structured Logging (3:30-6:00)
[Visual]: Log messages flowing [Animation]: Structured logs being searched and filtered
[Audio/Script]:
"Production logs must be structured, not just print statements."
[Demo - Structured Logging]:
import logging
import json
import sys
from datetime import datetime
from typing import Any
import uuid
from contextvars import ContextVarRequest context
request_id_var: ContextVar[str] = ContextVar("request_id", default="")class JSONFormatter(logging.Formatter):
"""Format logs as JSON for easy parsing"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"request_id": request_id_var.get(),
}
# Add extra fields
if hasattr(record, "extra_fields"):
log_data.update(record.extra_fields)
# Add exception info
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
class AgentLogger:
"""Structured logger for AI agents"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON handler for production
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
self.logger.addHandler(handler)
def _log(self, level: str, message: str, **kwargs):
"""Log with extra fields"""
record = self.logger.makeRecord(
self.logger.name, getattr(logging, level),
"", 0, message, (), None
)
record.extra_fields = kwargs
self.logger.handle(record)
def info(self, message: str, **kwargs):
self._log("INFO", message, **kwargs)
def error(self, message: str, **kwargs):
self._log("ERROR", message, **kwargs)
def debug(self, message: str, **kwargs):
self._log("DEBUG", message, **kwargs)
Usage
logger = AgentLogger("agent-service")async def process_task(task: str, user_id: str):
# Set request context
request_id = str(uuid.uuid4())[:8]
request_id_var.set(request_id)
logger.info("Task started",
task=task[:100],
user_id=user_id,
event="task_start"
)
start_time = time.time()
try:
result = await agent.execute(task)
duration = time.time() - start_time
logger.info("Task completed",
duration_ms=int(duration * 1000),
tokens_used=result.get("tokens", 0),
event="task_complete"
)
return result
except Exception as e:
duration = time.time() - start_time
logger.error("Task failed",
duration_ms=int(duration * 1000),
error=str(e),
error_type=type(e).__name__,
event="task_error"
)
raise
Output example:
{"timestamp": "2025-12-05T10:30:00Z", "level": "INFO", "logger": "agent-service",
"message": "Task completed", "request_id": "abc12345", "duration_ms": 1523,
"tokens_used": 450, "event": "task_complete"}
---
Scene 4: Metrics Collection (6:00-8:30)
[Visual]: Metrics dashboard with graphs [Animation]: Real-time metrics updating
[Audio/Script]:
"Metrics tell you how your system is performing over time."
[Demo - Prometheus Metrics]:
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
import timeDefine metrics
REQUESTS_TOTAL = Counter(
'agent_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)REQUEST_DURATION = Histogram(
'agent_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]
)
TOKENS_USED = Counter(
'agent_tokens_used_total',
'Total tokens used',
['model', 'agent_type']
)
ACTIVE_REQUESTS = Gauge(
'agent_active_requests',
'Currently active requests'
)
CACHE_HITS = Counter(
'agent_cache_hits_total',
'Cache hit/miss counter',
['hit']
)
Middleware to collect metrics
class MetricsMiddleware:
def __init__(self, app: FastAPI):
self.app = app async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
method = scope["method"]
path = scope["path"]
# Track active requests
ACTIVE_REQUESTS.inc()
start_time = time.time()
# Store status code
status_code = 500
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message["status"]
await send(message)
try:
await self.app(scope, receive, send_wrapper)
finally:
# Record metrics
duration = time.time() - start_time
REQUESTS_TOTAL.labels(method=method, endpoint=path, status=status_code).inc()
REQUEST_DURATION.labels(method=method, endpoint=path).observe(duration)
ACTIVE_REQUESTS.dec()
Metrics endpoint
app = FastAPI()@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(
generate_latest(),
media_type="text/plain"
)
In your agent code
async def execute_with_metrics(task: str) -> dict:
"""Execute task with metrics collection"""
with REQUEST_DURATION.labels(method="POST", endpoint="/api/v1/task").time():
result = await agent.execute(task) # Track tokens
TOKENS_USED.labels(model="claude-sonnet", agent_type="main").inc(
result.get("tokens_used", 0)
)
# Track cache
if result.get("_cached"):
CACHE_HITS.labels(hit="true").inc()
else:
CACHE_HITS.labels(hit="false").inc()
return result
---
Scene 5: Distributed Tracing (8:30-11:00)
[Visual]: Request trace through multiple services [Animation]: Spans connecting across services
[Audio/Script]:
"Traces show the journey of a request through your system."
[Demo - OpenTelemetry Tracing]:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentationSetup tracer
def setup_tracing(service_name: str, otlp_endpoint: str):
"""Initialize OpenTelemetry tracing"""
provider = TracerProvider() # Export to OTLP collector (Jaeger, Tempo, etc.)
exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Auto-instrument frameworks
FastAPIInstrumentor.instrument()
HTTPXClientInstrumentation().instrument()
return trace.get_tracer(service_name)
tracer = setup_tracing("agent-service", "http://jaeger:4317")
Manual spans for custom logic
async def execute_task(task: str, context: dict) -> dict:
"""Execute task with detailed tracing""" with tracer.start_as_current_span("execute_task") as span:
# Add attributes
span.set_attribute("task.length", len(task))
span.set_attribute("context.keys", list(context.keys()))
# Phase 1: Planning
with tracer.start_as_current_span("planning") as plan_span:
plan = await planner.create_plan(task)
plan_span.set_attribute("plan.steps", len(plan.steps))
# Phase 2: Execution
with tracer.start_as_current_span("execution") as exec_span:
for i, step in enumerate(plan.steps):
with tracer.start_as_current_span(f"step_{i}") as step_span:
step_span.set_attribute("step.agent", step.agent_name)
step_span.set_attribute("step.task", step.task[:50])
result = await execute_step(step)
step_span.set_attribute("step.status", result.get("status"))
# Phase 3: Synthesis
with tracer.start_as_current_span("synthesis"):
final_result = await synthesizer.synthesize(results)
span.set_attribute("result.status", final_result.get("status"))
return final_result
Propagating trace context to other services
async def call_external_service(client: httpx.AsyncClient, url: str, data: dict):
"""Call external service with trace propagation""" # Trace context is automatically propagated by instrumentation
response = await client.post(url, json=data)
return response.json()
Result: Full trace showing:
execute_task (1.5s)
├── planning (100ms)
├── execution (1.2s)
│ ├── step_0: research_agent (500ms)
│ ├── step_1: analysis_agent (400ms)
│ └── step_2: writer_agent (300ms)
└── synthesis (200ms)
---
Scene 6: Alerting (11:00-12:30)
[Visual]: Alert notifications [Animation]: Threshold breach triggering alerts
[Audio/Script]:
"Metrics and logs are useless if no one looks at them.>
Alerting notifies you when things go wrong."
[Demo - Alerting Rules]:
Prometheus alerting rules
alerts.yml
groups:
- name: agent-service
rules:
# High error rate
- alert: HighErrorRate
expr: |
sum(rate(agent_requests_total{status=~"5.."}[5m]))
/
sum(rate(agent_requests_total[5m]))
> 0.05
for: 5m
labels:
severity: critical
annotations:
summary: High error rate in agent service
description: "Error rate is {{ $value | humanizePercentage }} (>5%)"
# Slow response times
- alert: SlowResponseTime
expr: |
histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m]))
> 30
for: 10m
labels:
severity: warning
annotations:
summary: Slow response times
description: "95th percentile latency is {{ $value }}s (>30s)"
# High token usage (cost control)
- alert: HighTokenUsage
expr: |
increase(agent_tokens_used_total[1h]) > 1000000
labels:
severity: warning
annotations:
summary: High token usage
description: "Used {{ $value }} tokens in the last hour"
# Service down
- alert: ServiceDown
expr: up{job="agent-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: Agent service is down
description: "{{ $labels.instance }} is unreachable"
# No requests (possibly broken)
- alert: NoTraffic
expr: |
sum(rate(agent_requests_total[5m])) == 0
for: 15m
labels:
severity: warning
annotations:
summary: No traffic to agent service
description: "No requests in 15 minutes during business hours"
[Demo - Python Alerting]:
import httpx
from dataclasses import dataclass
from typing import Optional
from datetime import datetime@dataclass
class Alert:
name: str
severity: str # info, warning, critical
message: str
timestamp: datetime
labels: dict
class AlertManager:
"""Send alerts to various channels"""
def __init__(self, slack_webhook: str, pagerduty_key: Optional[str] = None):
self.slack_webhook = slack_webhook
self.pagerduty_key = pagerduty_key
async def send(self, alert: Alert):
"""Send alert to appropriate channels"""
# Always send to Slack
await self._send_slack(alert)
# Critical alerts go to PagerDuty
if alert.severity == "critical" and self.pagerduty_key:
await self._send_pagerduty(alert)
async def _send_slack(self, alert: Alert):
"""Send to Slack"""
color = {
"info": "#36a64f",
"warning": "#ff9500",
"critical": "#ff0000"
}.get(alert.severity, "#808080")
payload = {
"attachments": [{
"color": color,
"title": f"[{alert.severity.upper()}] {alert.name}",
"text": alert.message,
"footer": f"Agent Service | {alert.timestamp.isoformat()}",
"fields": [
{"title": k, "value": str(v), "short": True}
for k, v in alert.labels.items()
]
}]
}
async with httpx.AsyncClient() as client:
await client.post(self.slack_webhook, json=payload)
async def _send_pagerduty(self, alert: Alert):
"""Send to PagerDuty"""
payload = {
"routing_key": self.pagerduty_key,
"event_action": "trigger",
"payload": {
"summary": f"{alert.name}: {alert.message}",
"severity": "critical",
"source": "agent-service",
"custom_details": alert.labels
}
}
async with httpx.AsyncClient() as client:
await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
---
Scene 7: Dashboards (12:30-13:30)
[Visual]: Grafana dashboard [Animation]: Panels updating in real-time
[Audio/Script]:
"Bring it all together with dashboards."
[Demo - Grafana Dashboard]:
{
"title": "Agent Service Overview",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [{
"expr": "sum(rate(agent_requests_total[5m]))",
"legendFormat": "Requests/sec"
}]
},
{
"title": "Response Time (P95)",
"type": "graph",
"targets": [{
"expr": "histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m]))",
"legendFormat": "P95 Latency"
}]
},
{
"title": "Error Rate",
"type": "gauge",
"targets": [{
"expr": "sum(rate(agent_requests_total{status=~\"5..\"}[5m])) / sum(rate(agent_requests_total[5m]))"
}],
"thresholds": [
{"value": 0, "color": "green"},
{"value": 0.01, "color": "yellow"},
{"value": 0.05, "color": "red"}
]
},
{
"title": "Token Usage",
"type": "stat",
"targets": [{
"expr": "increase(agent_tokens_used_total[24h])"
}]
},
{
"title": "Active Requests",
"type": "graph",
"targets": [{
"expr": "agent_active_requests",
"legendFormat": "Active"
}]
},
{
"title": "Cache Hit Rate",
"type": "gauge",
"targets": [{
"expr": "sum(rate(agent_cache_hits_total{hit=\"true\"}[5m])) / sum(rate(agent_cache_hits_total[5m]))"
}]
}
]
}---
Scene 8: Challenge Time (13:30-14:00)
[Visual]: Challenge specification [Animation]: XP reward display
[Audio/Script]:
"Your challenge: Add full observability to your agent service.>
Requirements:
1. Structured JSON logging with request IDs
2. Prometheus metrics (requests, latency, tokens)
3. OpenTelemetry tracing with spans for each phase
4. Alert rules for errors and slow responses
5. Grafana dashboard with key metrics>
Complete this for 850 XP and the 'Observability Expert' badge.>
Next: The final capstone - building an enterprise-grade AI system."
---
Post-Video Challenge
Challenge ID: TRACK4_003_CHALLENGE Type: Code + Infrastructure Instructions:
Task 1: Implement structured logging
claude "Create a structured logging system:
1. JSON log formatter
2. Request ID propagation
3. Log levels and filtering
4. Extra fields for context"Task 2: Add Prometheus metrics
claude "Add Prometheus metrics:
1. Request counter with labels
2. Duration histogram
3. Token usage counter
4. Active request gauge
5. /metrics endpoint"Task 3: Add distributed tracing
claude "Add OpenTelemetry tracing:
1. Tracer setup with OTLP export
2. Spans for each execution phase
3. Attributes on spans
4. Auto-instrumentation for FastAPI"Task 4: Create alerting rules
claude "Create Prometheus alerting rules for:
1. High error rate (>5%)
2. Slow responses (>30s P95)
3. Service down
4. High token usage"Task 5: Create dashboard
- Create Grafana dashboard JSON
- Include: Request rate, latency, errors, tokens, cache hits
- XP: 850 (450 base + 400 challenge)
- Achievement: "Observability Expert"
SEO Metadata
Alt-text: Monitoring and observability for AI agents - structured logging, Prometheus metrics, OpenTelemetry tracing, alerting, Grafana dashboards.
Tags: AI monitoring, observability, structured logging, Prometheus, OpenTelemetry, tracing, alerting, Grafana
Keywords: monitor ai agents, ai observability, prometheus metrics ai, opentelemetry ai, ai alerting, grafana dashboard ai