📚 Monitoring & Observability

🎯 Level 48+ ⭐ 850 XP ⏱️ 14 min

[VIDEO-018] Monitoring & Observability

Track: 4 - Production Mastery Module: 3 Duration: 14 minutes Level requirement: 48 XP reward: 450 XP

---

Scene 1: Flying Blind vs Full Visibility (0:00-1:30)

[Visual]: Dark cockpit vs illuminated dashboard [Animation]: System metrics appearing on screens

[Audio/Script]:

"Your AI system is in production. It's handling traffic.
>
But can you answer these questions?
- How many requests per second?
- What's the average response time?
- Are there errors happening?
- Which agents are overloaded?
- Is performance degrading?
>
If you can't answer these, you're flying blind.
>
Observability gives you the instruments to see inside your system."

[Lower third]: "Track 4: Production Mastery | Level 48"

---

Scene 2: The Three Pillars (1:30-3:30)

[Visual]: Three pillars supporting a building [Animation]: Each pillar lighting up

[Audio/Script]:

"Observability has three pillars:
>
1. Metrics - Numbers over time
How many? How fast? How often?
>
2. Logs - Event records
What happened? In what order?
>
3. Traces - Request journeys
Where did time go? What path did requests take?
>
Together, they answer any question about your system."

[Diagram]:

                    OBSERVABILITY
                         │
       ┌─────────────────┼─────────────────┐
       │                 │                 │
   ┌───▼───┐         ┌───▼───┐         ┌───▼───┐
   │METRICS│         │ LOGS  │         │TRACES │
   └───────┘         └───────┘         └───────┘
       │                 │                 │
   Numbers           Events            Journeys
   Over Time         Records           Through System

"How many?" "What happened?" "How long did "How fast?" "When?" each step take?"

---

Scene 3: Structured Logging (3:30-6:00)

[Visual]: Log messages flowing [Animation]: Structured logs being searched and filtered

[Audio/Script]:

"Production logs must be structured, not just print statements."

[Demo - Structured Logging]:

import logging
import json
import sys
from datetime import datetime
from typing import Any
import uuid
from contextvars import ContextVar

Request context

request_id_var: ContextVar[str] = ContextVar("request_id", default="")

class JSONFormatter(logging.Formatter): """Format logs as JSON for easy parsing"""

def format(self, record: logging.LogRecord) -> str: log_data = { "timestamp": datetime.utcnow().isoformat() + "Z", "level": record.levelname, "logger": record.name, "message": record.getMessage(), "request_id": request_id_var.get(), }

# Add extra fields if hasattr(record, "extra_fields"): log_data.update(record.extra_fields)

# Add exception info if record.exc_info: log_data["exception"] = self.formatException(record.exc_info)

return json.dumps(log_data)

class AgentLogger: """Structured logger for AI agents"""

def __init__(self, name: str): self.logger = logging.getLogger(name) self.logger.setLevel(logging.INFO)

# JSON handler for production handler = logging.StreamHandler(sys.stdout) handler.setFormatter(JSONFormatter()) self.logger.addHandler(handler)

def _log(self, level: str, message: str, **kwargs): """Log with extra fields""" record = self.logger.makeRecord( self.logger.name, getattr(logging, level), "", 0, message, (), None ) record.extra_fields = kwargs self.logger.handle(record)

def info(self, message: str, **kwargs): self._log("INFO", message, **kwargs)

def error(self, message: str, **kwargs): self._log("ERROR", message, **kwargs)

def debug(self, message: str, **kwargs): self._log("DEBUG", message, **kwargs)

Usage

logger = AgentLogger("agent-service")

async def process_task(task: str, user_id: str): # Set request context request_id = str(uuid.uuid4())[:8] request_id_var.set(request_id)

logger.info("Task started", task=task[:100], user_id=user_id, event="task_start" )

start_time = time.time() try: result = await agent.execute(task)

duration = time.time() - start_time logger.info("Task completed", duration_ms=int(duration * 1000), tokens_used=result.get("tokens", 0), event="task_complete" ) return result

except Exception as e: duration = time.time() - start_time logger.error("Task failed", duration_ms=int(duration * 1000), error=str(e), error_type=type(e).__name__, event="task_error" ) raise

Output example:

{"timestamp": "2025-12-05T10:30:00Z", "level": "INFO", "logger": "agent-service",

"message": "Task completed", "request_id": "abc12345", "duration_ms": 1523,

"tokens_used": 450, "event": "task_complete"}

---

Scene 4: Metrics Collection (6:00-8:30)

[Visual]: Metrics dashboard with graphs [Animation]: Real-time metrics updating

[Audio/Script]:

"Metrics tell you how your system is performing over time."

[Demo - Prometheus Metrics]:

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
import time

Define metrics

REQUESTS_TOTAL = Counter( 'agent_requests_total', 'Total number of requests', ['method', 'endpoint', 'status'] )

REQUEST_DURATION = Histogram( 'agent_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0] )

TOKENS_USED = Counter( 'agent_tokens_used_total', 'Total tokens used', ['model', 'agent_type'] )

ACTIVE_REQUESTS = Gauge( 'agent_active_requests', 'Currently active requests' )

CACHE_HITS = Counter( 'agent_cache_hits_total', 'Cache hit/miss counter', ['hit'] )

Middleware to collect metrics

class MetricsMiddleware: def __init__(self, app: FastAPI): self.app = app

async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return

method = scope["method"] path = scope["path"]

# Track active requests ACTIVE_REQUESTS.inc() start_time = time.time()

# Store status code status_code = 500

async def send_wrapper(message): nonlocal status_code if message["type"] == "http.response.start": status_code = message["status"] await send(message)

try: await self.app(scope, receive, send_wrapper) finally: # Record metrics duration = time.time() - start_time REQUESTS_TOTAL.labels(method=method, endpoint=path, status=status_code).inc() REQUEST_DURATION.labels(method=method, endpoint=path).observe(duration) ACTIVE_REQUESTS.dec()

Metrics endpoint

app = FastAPI()

@app.get("/metrics") async def metrics(): """Prometheus metrics endpoint""" return Response( generate_latest(), media_type="text/plain" )

In your agent code

async def execute_with_metrics(task: str) -> dict: """Execute task with metrics collection""" with REQUEST_DURATION.labels(method="POST", endpoint="/api/v1/task").time(): result = await agent.execute(task)

# Track tokens TOKENS_USED.labels(model="claude-sonnet", agent_type="main").inc( result.get("tokens_used", 0) )

# Track cache if result.get("_cached"): CACHE_HITS.labels(hit="true").inc() else: CACHE_HITS.labels(hit="false").inc()

return result

---

Scene 5: Distributed Tracing (8:30-11:00)

[Visual]: Request trace through multiple services [Animation]: Spans connecting across services

[Audio/Script]:

"Traces show the journey of a request through your system."

[Demo - OpenTelemetry Tracing]:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentation

Setup tracer

def setup_tracing(service_name: str, otlp_endpoint: str): """Initialize OpenTelemetry tracing""" provider = TracerProvider()

# Export to OTLP collector (Jaeger, Tempo, etc.) exporter = OTLPSpanExporter(endpoint=otlp_endpoint) processor = BatchSpanProcessor(exporter) provider.add_span_processor(processor)

trace.set_tracer_provider(provider)

# Auto-instrument frameworks FastAPIInstrumentor.instrument() HTTPXClientInstrumentation().instrument()

return trace.get_tracer(service_name)

tracer = setup_tracing("agent-service", "http://jaeger:4317")

Manual spans for custom logic

async def execute_task(task: str, context: dict) -> dict: """Execute task with detailed tracing"""

with tracer.start_as_current_span("execute_task") as span: # Add attributes span.set_attribute("task.length", len(task)) span.set_attribute("context.keys", list(context.keys()))

# Phase 1: Planning with tracer.start_as_current_span("planning") as plan_span: plan = await planner.create_plan(task) plan_span.set_attribute("plan.steps", len(plan.steps))

# Phase 2: Execution with tracer.start_as_current_span("execution") as exec_span: for i, step in enumerate(plan.steps): with tracer.start_as_current_span(f"step_{i}") as step_span: step_span.set_attribute("step.agent", step.agent_name) step_span.set_attribute("step.task", step.task[:50])

result = await execute_step(step)

step_span.set_attribute("step.status", result.get("status"))

# Phase 3: Synthesis with tracer.start_as_current_span("synthesis"): final_result = await synthesizer.synthesize(results)

span.set_attribute("result.status", final_result.get("status")) return final_result

Propagating trace context to other services

async def call_external_service(client: httpx.AsyncClient, url: str, data: dict): """Call external service with trace propagation"""

# Trace context is automatically propagated by instrumentation response = await client.post(url, json=data) return response.json()

Result: Full trace showing:

execute_task (1.5s)

├── planning (100ms)

├── execution (1.2s)

│ ├── step_0: research_agent (500ms)

│ ├── step_1: analysis_agent (400ms)

│ └── step_2: writer_agent (300ms)

└── synthesis (200ms)

---

Scene 6: Alerting (11:00-12:30)

[Visual]: Alert notifications [Animation]: Threshold breach triggering alerts

[Audio/Script]:

"Metrics and logs are useless if no one looks at them.
>
Alerting notifies you when things go wrong."

[Demo - Alerting Rules]:

Prometheus alerting rules

alerts.yml

groups: - name: agent-service rules: # High error rate - alert: HighErrorRate expr: | sum(rate(agent_requests_total{status=~"5.."}[5m])) / sum(rate(agent_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: High error rate in agent service description: "Error rate is {{ $value | humanizePercentage }} (>5%)"

# Slow response times - alert: SlowResponseTime expr: | histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m])) > 30 for: 10m labels: severity: warning annotations: summary: Slow response times description: "95th percentile latency is {{ $value }}s (>30s)"

# High token usage (cost control) - alert: HighTokenUsage expr: | increase(agent_tokens_used_total[1h]) > 1000000 labels: severity: warning annotations: summary: High token usage description: "Used {{ $value }} tokens in the last hour"

# Service down - alert: ServiceDown expr: up{job="agent-service"} == 0 for: 1m labels: severity: critical annotations: summary: Agent service is down description: "{{ $labels.instance }} is unreachable"

# No requests (possibly broken) - alert: NoTraffic expr: | sum(rate(agent_requests_total[5m])) == 0 for: 15m labels: severity: warning annotations: summary: No traffic to agent service description: "No requests in 15 minutes during business hours"

[Demo - Python Alerting]:

import httpx
from dataclasses import dataclass
from typing import Optional
from datetime import datetime

@dataclass class Alert: name: str severity: str # info, warning, critical message: str timestamp: datetime labels: dict

class AlertManager: """Send alerts to various channels"""

def __init__(self, slack_webhook: str, pagerduty_key: Optional[str] = None): self.slack_webhook = slack_webhook self.pagerduty_key = pagerduty_key

async def send(self, alert: Alert): """Send alert to appropriate channels"""

# Always send to Slack await self._send_slack(alert)

# Critical alerts go to PagerDuty if alert.severity == "critical" and self.pagerduty_key: await self._send_pagerduty(alert)

async def _send_slack(self, alert: Alert): """Send to Slack""" color = { "info": "#36a64f", "warning": "#ff9500", "critical": "#ff0000" }.get(alert.severity, "#808080")

payload = { "attachments": [{ "color": color, "title": f"[{alert.severity.upper()}] {alert.name}", "text": alert.message, "footer": f"Agent Service | {alert.timestamp.isoformat()}", "fields": [ {"title": k, "value": str(v), "short": True} for k, v in alert.labels.items() ] }] }

async with httpx.AsyncClient() as client: await client.post(self.slack_webhook, json=payload)

async def _send_pagerduty(self, alert: Alert): """Send to PagerDuty""" payload = { "routing_key": self.pagerduty_key, "event_action": "trigger", "payload": { "summary": f"{alert.name}: {alert.message}", "severity": "critical", "source": "agent-service", "custom_details": alert.labels } }

async with httpx.AsyncClient() as client: await client.post( "https://events.pagerduty.com/v2/enqueue", json=payload )

---

Scene 7: Dashboards (12:30-13:30)

[Visual]: Grafana dashboard [Animation]: Panels updating in real-time

[Audio/Script]:

"Bring it all together with dashboards."

[Demo - Grafana Dashboard]:

{
  "title": "Agent Service Overview",
  "panels": [
    {
      "title": "Request Rate",
      "type": "graph",
      "targets": [{
        "expr": "sum(rate(agent_requests_total[5m]))",
        "legendFormat": "Requests/sec"
      }]
    },
    {
      "title": "Response Time (P95)",
      "type": "graph",
      "targets": [{
        "expr": "histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m]))",
        "legendFormat": "P95 Latency"
      }]
    },
    {
      "title": "Error Rate",
      "type": "gauge",
      "targets": [{
        "expr": "sum(rate(agent_requests_total{status=~\"5..\"}[5m])) / sum(rate(agent_requests_total[5m]))"
      }],
      "thresholds": [
        {"value": 0, "color": "green"},
        {"value": 0.01, "color": "yellow"},
        {"value": 0.05, "color": "red"}
      ]
    },
    {
      "title": "Token Usage",
      "type": "stat",
      "targets": [{
        "expr": "increase(agent_tokens_used_total[24h])"
      }]
    },
    {
      "title": "Active Requests",
      "type": "graph",
      "targets": [{
        "expr": "agent_active_requests",
        "legendFormat": "Active"
      }]
    },
    {
      "title": "Cache Hit Rate",
      "type": "gauge",
      "targets": [{
        "expr": "sum(rate(agent_cache_hits_total{hit=\"true\"}[5m])) / sum(rate(agent_cache_hits_total[5m]))"
      }]
    }
  ]
}

---

Scene 8: Challenge Time (13:30-14:00)

[Visual]: Challenge specification [Animation]: XP reward display

[Audio/Script]:

"Your challenge: Add full observability to your agent service.
>
Requirements:
1. Structured JSON logging with request IDs
2. Prometheus metrics (requests, latency, tokens)
3. OpenTelemetry tracing with spans for each phase
4. Alert rules for errors and slow responses
5. Grafana dashboard with key metrics
>
Complete this for 850 XP and the 'Observability Expert' badge.
>
Next: The final capstone - building an enterprise-grade AI system."

---

Post-Video Challenge

Challenge ID: TRACK4_003_CHALLENGE Type: Code + Infrastructure Instructions:

Task 1: Implement structured logging

claude "Create a structured logging system:
1. JSON log formatter
2. Request ID propagation
3. Log levels and filtering
4. Extra fields for context"

Task 2: Add Prometheus metrics

claude "Add Prometheus metrics:
1. Request counter with labels
2. Duration histogram
3. Token usage counter
4. Active request gauge
5. /metrics endpoint"

Task 3: Add distributed tracing

claude "Add OpenTelemetry tracing:
1. Tracer setup with OTLP export
2. Spans for each execution phase
3. Attributes on spans
4. Auto-instrumentation for FastAPI"

Task 4: Create alerting rules

claude "Create Prometheus alerting rules for:
1. High error rate (>5%)
2. Slow responses (>30s P95)
3. Service down
4. High token usage"

Task 5: Create dashboard

  • Create Grafana dashboard JSON
  • Include: Request rate, latency, errors, tokens, cache hits
Rewards:
  • XP: 850 (450 base + 400 challenge)
  • Achievement: "Observability Expert"
---

SEO Metadata

Alt-text: Monitoring and observability for AI agents - structured logging, Prometheus metrics, OpenTelemetry tracing, alerting, Grafana dashboards.

Tags: AI monitoring, observability, structured logging, Prometheus, OpenTelemetry, tracing, alerting, Grafana

Keywords: monitor ai agents, ai observability, prometheus metrics ai, opentelemetry ai, ai alerting, grafana dashboard ai

Last modified: Wednesday, 10 December 2025, 1:05 AM