[TRACK4_004] Final Capstone: Enterprise AI System
[TRACK4_004] Final Capstone: Enterprise AI System
📚 Final Capstone: Enterprise AI System
[VIDEO-019] Track 4 Capstone: Enterprise AI System
Track: 4 - Production Mastery Module: 4 (Final Capstone) Duration: 20 minutes Level requirement: 49 XP reward: 750 XP (+ 4000 XP for completion)
---
Scene 1: The Final Challenge (0:00-2:00)
[Visual]: All previous badges assembling into the ultimate achievement [Animation]: Journey from Level 1 to Level 50 highlighted
[Audio/Script]:
"You've come so far.>
From 'What is Claude?' to building autonomous multi-agent systems.
From single commands to production-grade infrastructure.>
This is it. The final capstone.>
Your challenge: Build an enterprise-grade AI system that could serve a real company.>
Not a prototype. Not a demo. A production-ready system.>
This combines everything:
- Multi-agent orchestration
- Scalable deployment
- Full observability
- Security and reliability
- Cost optimization>
Let's build your masterpiece."
[Lower third]: "Track 4 Final Capstone | Level 49-50"
---
Scene 2: System Requirements (2:00-5:00)
[Visual]: Enterprise requirements document [Animation]: Requirements checking off
[Audio/Script]:
"Your enterprise system: AI Operations Center>
A multi-agent platform for business operations automation.>
Core Capabilities:"
[Requirements Table]:
┌─────────────────────────────────────────────────────────────────┐
│ AI OPERATIONS CENTER │
├─────────────────────────────────────────────────────────────────┤
│ FUNCTIONAL REQUIREMENTS │
├─────────────────────────────────────────────────────────────────┤
│ 1. Task Processing │
│ - Accept tasks via API │
│ - Route to appropriate specialist agents │
│ - Return structured results │
│ │
│ 2. Agent Types (minimum 4) │
│ - Research Agent: Information gathering │
│ - Analysis Agent: Data analysis │
│ - Writer Agent: Content creation │
│ - Code Agent: Development tasks │
│ │
│ 3. Orchestration │
│ - Automatic task decomposition │
│ - Parallel execution where possible │
│ - Dependency management │
│ │
│ 4. Persistence │
│ - Task history │
│ - Agent performance metrics │
│ - Cost tracking │
├─────────────────────────────────────────────────────────────────┤
│ NON-FUNCTIONAL REQUIREMENTS │
├─────────────────────────────────────────────────────────────────┤
│ 1. Availability: 99.9% uptime │
│ 2. Latency: <2s for simple tasks │
│ 3. Scale: Handle 100 concurrent requests │
│ 4. Security: API key authentication │
│ 5. Observability: Full metrics, logs, traces │
│ 6. Cost: Track and limit token usage │
└─────────────────────────────────────────────────────────────────┘---
Scene 3: Architecture Design (5:00-8:00)
[Visual]: Complete system architecture [Animation]: Components connecting with data flows
[Audio/Script]:
"Here's the enterprise architecture:"
[Diagram]:
┌──────────────────┐
│ Load Balancer │
│ (nginx) │
└────────┬─────────┘
│
┌────────▼─────────┐
│ API Gateway │
│ - Auth (API Key)│
│ - Rate Limiting │
│ - Validation │
└────────┬─────────┘
│
┌────────────────────────────────────┼────────────────────────────────────┐
│ │ │
│ ┌─────────────────────────────────▼─────────────────────────────────┐ │
│ │ ORCHESTRATOR │ │
│ │ - Task Decomposition │ │
│ │ - Agent Selection │ │
│ │ - Execution Management │ │
│ │ - Result Synthesis │ │
│ └───────────────────┬───────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┬─────────────────┐ │
│ │ │ │ │ │
│ ┌─▼────┐ ┌──▼───┐ ┌───▼──┐ ┌───▼──┐ │
│ │Research│ │Analysis│ │Writer │ │ Code │ │
│ │Agent │ │Agent │ │Agent │ │Agent │ │
│ └───────┘ └───────┘ └──────┘ └──────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ SHARED SERVICES │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │PostgreSQL│ │ Redis │ │ Message │ │ Storage │ │ │
│ │ │(Tasks, │ │(Cache, │ │ Queue │ │(Files, Artifacts)│ │ │
│ │ │ Metrics) │ │ State) │ │(RabbitMQ)│ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────▼─────┐ ┌──────▼─────┐ ┌─────▼─────┐
│ Prometheus│ │ Jaeger │ │ Grafana │
│ (Metrics) │ │ (Traces) │ │(Dashboards)│
└───────────┘ └────────────┘ └───────────┘---
Scene 4: Core Implementation (8:00-12:00)
[Visual]: Code structure and key files [Animation]: Files being created and connected
[Audio/Script]:
"Let's build the core components."
[Demo - Project Structure]:
ai-ops-center/
├── api/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── routes/
│ │ ├── tasks.py # Task endpoints
│ │ ├── agents.py # Agent management
│ │ └── metrics.py # Metrics endpoints
│ ├── middleware/
│ │ ├── auth.py # API key auth
│ │ ├── rate_limit.py # Rate limiting
│ │ └── tracing.py # OpenTelemetry
│ └── models/
│ ├── requests.py # Pydantic models
│ └── responses.py
├── orchestrator/
│ ├── __init__.py
│ ├── orchestrator.py # Main orchestrator
│ ├── planner.py # Task decomposition
│ └── executor.py # Execution engine
├── agents/
│ ├── __init__.py
│ ├── base.py # Base agent class
│ ├── research.py # Research agent
│ ├── analysis.py # Analysis agent
│ ├── writer.py # Writer agent
│ └── code.py # Code agent
├── services/
│ ├── database.py # PostgreSQL
│ ├── cache.py # Redis
│ ├── queue.py # Message queue
│ └── storage.py # File storage
├── observability/
│ ├── logging.py # Structured logging
│ ├── metrics.py # Prometheus metrics
│ └── tracing.py # OpenTelemetry
├── config/
│ ├── settings.py # Configuration
│ └── config.yaml # Default config
├── tests/
│ ├── test_api.py
│ ├── test_orchestrator.py
│ └── test_agents.py
├── deploy/
│ ├── docker-compose.yml
│ ├── nginx.conf
│ └── prometheus/
│ └── alerts.yml
├── requirements.txt
├── Dockerfile
└── README.md[Demo - Main Application]:
api/main.py
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from api.routes import tasks, agents, metrics
from api.middleware.auth import AuthMiddleware
from api.middleware.rate_limit import RateLimitMiddleware
from observability.tracing import setup_tracing
from observability.logging import setup_logging
from services.database import DatabasePool
from services.cache import CacheService
from config.settings import Settingssettings = Settings()
logger = setup_logging("ai-ops-center")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifecycle management"""
# Startup
logger.info("Starting AI Operations Center")
# Initialize services
app.state.db = await DatabasePool.create(settings.database_url)
app.state.cache = await CacheService.create(settings.redis_url)
# Setup tracing
setup_tracing("ai-ops-center", settings.otlp_endpoint)
logger.info("All services initialized")
yield
# Shutdown
logger.info("Shutting down")
await app.state.db.close()
await app.state.cache.close()
def create_app() -> FastAPI:
"""Create and configure the application"""
app = FastAPI(
title="AI Operations Center",
version="1.0.0",
lifespan=lifespan
)
# Add middleware
app.add_middleware(AuthMiddleware, api_keys=settings.api_keys)
app.add_middleware(RateLimitMiddleware, rate_limit=settings.rate_limit)
# Add routes
app.include_router(tasks.router, prefix="/api/v1/tasks", tags=["tasks"])
app.include_router(agents.router, prefix="/api/v1/agents", tags=["agents"])
app.include_router(metrics.router, prefix="/metrics", tags=["metrics"])
# Health check
@app.get("/health")
async def health():
return {"status": "healthy", "version": "1.0.0"}
@app.get("/ready")
async def ready():
db_ok = await app.state.db.is_healthy()
cache_ok = await app.state.cache.is_healthy()
return {
"ready": db_ok and cache_ok,
"checks": {"database": db_ok, "cache": cache_ok}
}
return app
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
[Demo - Task Endpoint]:
api/routes/tasks.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from api.models.requests import TaskRequest
from api.models.responses import TaskResponse, TaskStatus
from orchestrator.orchestrator import Orchestrator
from observability.metrics import TASK_COUNTER, TASK_DURATION
import time
import uuidrouter = APIRouter()
@router.post("/", response_model=TaskResponse)
async def create_task(
request: TaskRequest,
background: BackgroundTasks,
orchestrator: Orchestrator = Depends(get_orchestrator)
):
"""Create and execute a new task"""
task_id = str(uuid.uuid4())
start_time = time.time()
# Record metric
TASK_COUNTER.labels(type=request.task_type, status="started").inc()
try:
# Execute task
result = await orchestrator.execute(
task_id=task_id,
description=request.description,
context=request.context,
priority=request.priority
)
duration = time.time() - start_time
TASK_DURATION.labels(type=request.task_type).observe(duration)
TASK_COUNTER.labels(type=request.task_type, status="completed").inc()
# Log task for analytics (non-blocking)
background.add_task(log_task_completion, task_id, result, duration)
return TaskResponse(
task_id=task_id,
status="completed",
result=result,
duration_seconds=duration,
tokens_used=result.get("total_tokens", 0),
agents_used=result.get("agents_used", [])
)
except Exception as e:
TASK_COUNTER.labels(type=request.task_type, status="failed").inc()
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{task_id}", response_model=TaskStatus)
async def get_task_status(task_id: str):
"""Get status of a task"""
# Query from database
task = await db.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.get("/", response_model=list[TaskStatus])
async def list_tasks(limit: int = 10, offset: int = 0):
"""List recent tasks"""
return await db.list_tasks(limit=limit, offset=offset)
---
Scene 5: Enterprise Features (12:00-15:00)
[Visual]: Enterprise-specific features [Animation]: Security, cost control, audit features
[Audio/Script]:
"Enterprise systems need enterprise features."
[Demo - API Key Authentication]:
api/middleware/auth.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import hashlib
import hmacclass AuthMiddleware(BaseHTTPMiddleware):
"""API Key authentication middleware"""
def __init__(self, app, api_keys: dict):
super().__init__(app)
self.api_keys = api_keys # {key_hash: {"name": "...", "tier": "...", "limits": {...}}}
async def dispatch(self, request: Request, call_next):
# Skip auth for health/metrics endpoints
if request.url.path in ["/health", "/ready", "/metrics"]:
return await call_next(request)
# Get API key
api_key = request.headers.get("X-API-Key")
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Validate key
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
key_info = self.api_keys.get(key_hash)
if not key_info:
raise HTTPException(status_code=401, detail="Invalid API key")
# Add key info to request state
request.state.api_key_name = key_info["name"]
request.state.api_key_tier = key_info["tier"]
request.state.api_key_limits = key_info["limits"]
return await call_next(request)
[Demo - Cost Control]:
services/cost_control.py
from dataclasses import dataclass
from typing import Optional
import asyncio@dataclass
class TokenBudget:
"""Token budget for an API key"""
daily_limit: int
monthly_limit: int
current_daily: int = 0
current_monthly: int = 0
class CostController:
"""Control and track AI costs"""
def __init__(self, db, cache):
self.db = db
self.cache = cache
self.token_prices = {
"claude-sonnet-4-20250514": {"input": 0.003, "output": 0.015}, # per 1K tokens
"claude-opus-4-5-20251101": {"input": 0.015, "output": 0.075},
}
async def check_budget(self, api_key: str, estimated_tokens: int) -> bool:
"""Check if API key has budget for request"""
budget = await self._get_budget(api_key)
if budget.current_daily + estimated_tokens > budget.daily_limit:
return False
if budget.current_monthly + estimated_tokens > budget.monthly_limit:
return False
return True
async def record_usage(self, api_key: str, model: str, input_tokens: int, output_tokens: int):
"""Record token usage"""
total_tokens = input_tokens + output_tokens
# Calculate cost
prices = self.token_prices.get(model, {"input": 0.003, "output": 0.015})
cost = (input_tokens / 1000 prices["input"]) + (output_tokens / 1000 prices["output"])
# Update counters
await self.db.execute("""
INSERT INTO token_usage (api_key, model, input_tokens, output_tokens, cost, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
""", api_key, model, input_tokens, output_tokens, cost)
# Update budget counters in cache
await self.cache.incrby(f"tokens:daily:{api_key}", total_tokens)
await self.cache.incrby(f"tokens:monthly:{api_key}", total_tokens)
async def get_usage_report(self, api_key: str, period: str = "month") -> dict:
"""Get usage report for API key"""
return await self.db.fetchrow("""
SELECT
SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cost) as total_cost,
COUNT(*) as request_count
FROM token_usage
WHERE api_key = $1
AND created_at > NOW() - INTERVAL '1 month'
""", api_key)
[Demo - Audit Logging]:
services/audit.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json@dataclass
class AuditEvent:
"""Audit log event"""
event_id: str
timestamp: datetime
api_key: str
action: str
resource: str
details: dict
ip_address: str
user_agent: str
success: bool
error: Optional[str] = None
class AuditLogger:
"""Enterprise audit logging"""
def __init__(self, db):
self.db = db
async def log(self, event: AuditEvent):
"""Log an audit event"""
await self.db.execute("""
INSERT INTO audit_log
(event_id, timestamp, api_key, action, resource, details, ip_address, user_agent, success, error)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
event.event_id,
event.timestamp,
event.api_key,
event.action,
event.resource,
json.dumps(event.details),
event.ip_address,
event.user_agent,
event.success,
event.error
)
async def query(self, api_key: str = None, action: str = None, start: datetime = None, end: datetime = None):
"""Query audit logs"""
query = "SELECT * FROM audit_log WHERE 1=1"
params = []
if api_key:
params.append(api_key)
query += f" AND api_key = ${len(params)}"
if action:
params.append(action)
query += f" AND action = ${len(params)}"
if start:
params.append(start)
query += f" AND timestamp >= ${len(params)}"
if end:
params.append(end)
query += f" AND timestamp <= ${len(params)}"<%14%26%>
query += " ORDER BY timestamp DESC LIMIT 1000"
return await self.db.fetch(query, *params)
---
Scene 6: Deployment (15:00-17:00)
[Visual]: Docker Compose and deployment [Animation]: Containers starting up
[Audio/Script]:
"Package everything for deployment."
[Demo - Docker Compose]:
deploy/docker-compose.yml
version: '3.8'services:
# Main API service (3 replicas)
api:
build: .
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/aiops
- REDIS_URL=redis://redis:6379
- OTLP_ENDPOINT=http://jaeger:4317
deploy:
replicas: 3
resources:
limits:
memory: 2G
cpus: '1.0'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
depends_on:
- db
- redis
# Load balancer
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./certs:/etc/nginx/certs
depends_on:
- api
# Database
db:
image: postgres:15
environment:
- POSTGRES_DB=aiops
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
# Cache
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
# Monitoring
prometheus:
image: prom/prometheus
volumes:
- ./prometheus:/etc/prometheus
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=15d'
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
jaeger:
image: jaegertracing/all-in-one
ports:
- "16686:16686" # UI
environment:
- COLLECTOR_OTLP_ENABLED=true
volumes:
postgres_data:
redis_data:
prometheus_data:
grafana_data:
---
Scene 7: Testing & Validation (17:00-19:00)
[Visual]: Test results and validation [Animation]: Tests passing, system verified
[Audio/Script]:
"Test everything before declaring victory."
[Demo - Testing]:
tests/test_integration.py
import pytest
import httpx
from fastapi.testclient import TestClient@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def api_key():
return "test-api-key-123"
class TestTaskEndpoint:
def test_create_simple_task(self, client, api_key):
"""Test creating a simple task"""
response = client.post(
"/api/v1/tasks/",
headers={"X-API-Key": api_key},
json={
"description": "Summarize the benefits of AI",
"task_type": "research",
"priority": "normal"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
assert data["status"] == "completed"
assert data["duration_seconds"] > 0
def test_create_complex_task(self, client, api_key):
"""Test multi-agent task"""
response = client.post(
"/api/v1/tasks/",
headers={"X-API-Key": api_key},
json={
"description": "Research AI trends, analyze key patterns, and write a summary report",
"task_type": "multi-agent",
"priority": "high"
}
)
assert response.status_code == 200
data = response.json()
assert len(data["agents_used"]) >= 2
def test_rate_limiting(self, client, api_key):
"""Test rate limiting works"""
# Make many requests quickly
responses = []
for _ in range(100):
r = client.post(
"/api/v1/tasks/",
headers={"X-API-Key": api_key},
json={"description": "Quick test", "task_type": "test"}
)
responses.append(r.status_code)
# Should see some 429s
assert 429 in responses
def test_invalid_api_key(self, client):
"""Test authentication"""
response = client.post(
"/api/v1/tasks/",
headers={"X-API-Key": "invalid-key"},
json={"description": "Test"}
)
assert response.status_code == 401
class TestObservability:
def test_health_endpoint(self, client):
"""Test health check"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_metrics_endpoint(self, client):
"""Test Prometheus metrics"""
response = client.get("/metrics")
assert response.status_code == 200
assert "agent_requests_total" in response.text
def test_ready_endpoint(self, client):
"""Test readiness check"""
response = client.get("/ready")
assert response.status_code == 200
assert response.json()["ready"] == True
Load test
async def load_test():
"""Simple load test"""
async with httpx.AsyncClient() as client:
tasks = []
for i in range(100):
task = client.post(
"http://localhost/api/v1/tasks/",
headers={"X-API-Key": "load-test-key"},
json={"description": f"Load test task {i}"}
)
tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
print(f"Success rate: {success/len(results)*100:.1f}%")
---
Scene 8: Graduation (19:00-20:00)
[Visual]: Ultimate achievement unlock [Animation]: Level 50 celebration, all badges displayed
[Audio/Script]:
"Your submission requirements:>
| Component | Points |
|-----------|--------|
| Full API with authentication | 600 |
| 4+ specialist agents | 600 |
| Orchestrator with decomposition | 600 |
| Scalable deployment (3+ replicas) | 500 |
| Full observability stack | 500 |
| Cost control and audit logging | 400 |
| Test suite with >80% coverage | 400 |
| Documentation | 400 |>
Total possible: 4000 XP>
Completing this capstone:
- Awards the 'Production Master' badge
- Awards the 'AI Grandmaster' title
- Reaches Level 50 - Maximum
- Completes the entire curriculum>
You started knowing nothing about Claude.>
Now you can build enterprise-grade AI systems.>
You are an AI engineer. A Production Master. A Grandmaster.>
Go build the future."
[Final animation]: All 4 track badges combine into 'AI Grandmaster' badge, Level 50, credits roll
---
Post-Video Challenge
Challenge ID: TRACK4_CAPSTONE (FINAL) Type: Full Project Submission Time Limit: None
Requirements:
1. API Service (600 XP)
- FastAPI with proper structure
- API key authentication
- Rate limiting
- Health/readiness endpoints
- 4+ specialist agents
- Research, Analysis, Writer, Code (minimum)
- Proper capability definitions
- Task decomposition
- Dependency management
- Parallel execution
- Result synthesis
- Docker Compose deployment
- 3+ API replicas
- Load balancer
- Database and cache
- Structured logging
- Prometheus metrics
- Distributed tracing
- Grafana dashboard
- Cost tracking/limits
- Audit logging
- Usage reports
- Unit tests
- Integration tests
- Load test results
- >80% coverage
- README with setup
- API documentation
- Architecture diagram
- Runbook for operations
- Complete project in your workspace
- Run:
/validate final-capstone - 5-minute demo video showing all features
- Base XP: 750
- Challenge XP: Up to 4000
- Achievement: "Production Master" (Track 4 Complete)
- Title: "AI Grandmaster" (Level 50)
- Certificate: "AI Campus Completion Certificate"
SEO Metadata
Alt-text: Final capstone project - build an enterprise AI operations center with multi-agent orchestration, scalable deployment, full observability, and enterprise features.
Tags: enterprise AI, final capstone, production AI system, multi-agent enterprise, AI operations center, AI grandmaster
Keywords: enterprise ai system, ai capstone final, production ai platform, multi-agent enterprise, ai operations center, ai grandmaster certification