[TRACK4_002] Scaling & Performance
[TRACK4_002] Scaling & Performance
📚 Scaling & Performance
[VIDEO-017] Scaling & Performance
Track: 4 - Production Mastery Module: 2 Duration: 14 minutes Level requirement: 47 XP reward: 450 XP
---
Scene 1: When Single Server Isn't Enough (0:00-1:30)
[Visual]: Single server overwhelmed vs distributed system [Animation]: Traffic spikes and system response
[Audio/Script]:
"Your AI service is live. Users love it. Traffic grows.>
Then one day:
- Response times spike
- Requests start failing
- Your single server hits its limits>
This is the scaling problem.>
The solution isn't bigger servers. It's smarter architecture.>
Let's learn to scale AI systems."
[Lower third]: "Track 4: Production Mastery | Level 47"
---
Scene 2: Vertical vs Horizontal Scaling (1:30-3:00)
[Visual]: Scaling comparison diagrams [Animation]: Server growing bigger vs multiplying
[Audio/Script]:
"Two approaches to scaling:>
Vertical Scaling (Scale Up)
- Bigger machine: More CPU, RAM, GPU
- Simple but limited
- Single point of failure>
Horizontal Scaling (Scale Out)
- More machines working together
- Nearly unlimited capacity
- Built-in redundancy>
For AI systems, horizontal scaling is the path to production scale."
[Diagram]:
Vertical: Horizontal:
┌────────────┐ ┌──────┐ ┌──────┐ ┌──────┐
│ │ │Agent │ │Agent │ │Agent │
│ Big │ │ 1 │ │ 2 │ │ 3 │
│ Agent │ └──────┘ └──────┘ └──────┘
│ Server │ │ │ │
│ │ └───────┼───────┘
│ │ │
└────────────┘ ┌───────────────┐
│ Load Balancer │
Limit: Hardware max └───────────────┘
Limit: Budget---
Scene 3: Stateless Design (3:00-5:00)
[Visual]: Stateless vs stateful architecture [Animation]: Requests flowing to any server
[Audio/Script]:
"The key to horizontal scaling: Stateless agents.>
Stateless: Any request can go to any server
Stateful: Requests must go to specific servers>
Make your agents stateless. Store state externally."
[Demo - Stateless Design]:
BAD: Stateful agent (can't scale)
class StatefulAgent:
def __init__(self):
self.conversation_history = [] # State in memory!
self.user_preferences = {} # State in memory! async def process(self, message: str, user_id: str):
self.conversation_history.append(message) # Lost if server restarts
return await self._generate_response()
GOOD: Stateless agent (scales infinitely)
class StatelessAgent:
def __init__(self, redis_client, db_pool):
self.cache = redis_client # External state
self.db = db_pool # External state async def process(self, message: str, user_id: str, session_id: str):
# Load state from external store
history = await self._load_history(session_id)
preferences = await self._load_preferences(user_id)
# Process
response = await self._generate_response(message, history, preferences)
# Save state to external store
await self._save_history(session_id, history + [message, response])
return response
async def _load_history(self, session_id: str) -> list:
"""Load conversation history from Redis"""
data = await self.cache.get(f"history:{session_id}")
return json.loads(data) if data else []
async def _save_history(self, session_id: str, history: list):
"""Save conversation history to Redis with TTL"""
await self.cache.setex(
f"history:{session_id}",
3600, # 1 hour TTL
json.dumps(history)
)
async def _load_preferences(self, user_id: str) -> dict:
"""Load user preferences from database"""
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"SELECT preferences FROM users WHERE id = $1",
user_id
)
return row["preferences"] if row else {}
---
Scene 4: Load Balancing (5:00-7:00)
[Visual]: Load balancer distributing traffic [Animation]: Requests being routed to different servers
[Audio/Script]:
"Load balancers distribute requests across your agents."
[Demo - Nginx Load Balancer]:
/etc/nginx/conf.d/agent-service.conf
upstream agent_backend {
# Round-robin by default
server agent1:8000 weight=1;
server agent2:8000 weight=1;
server agent3:8000 weight=1;
# Health checks
keepalive 32;
}
server {
listen 80;
server_name api.example.com;
# Redirect to HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /etc/ssl/certs/agent.crt;
ssl_certificate_key /etc/ssl/private/agent.key;
# Timeouts for AI workloads (longer than typical web)
proxy_connect_timeout 10s;
proxy_send_timeout 120s;
proxy_read_timeout 120s;
location /api/ {
proxy_pass http://agent_backend;
proxy_http_version 1.1;
# Headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Request-ID $request_id;
# Keepalive
proxy_set_header Connection "";
# Buffering for large responses
proxy_buffering on;
proxy_buffer_size 128k;
proxy_buffers 4 256k;
}
location /health {
proxy_pass http://agent_backend/health;
proxy_connect_timeout 5s;
proxy_read_timeout 5s;
}
}
[Demo - Load Balancing Strategies]:
Different load balancing strategies
1. Round Robin (default)
Requests distributed equally to all servers
upstream backend {
server agent1:8000;
server agent2:8000;
server agent3:8000;
}2. Least Connections
Request goes to server with fewest active connections
upstream backend {
least_conn;
server agent1:8000;
server agent2:8000;
server agent3:8000;
}3. IP Hash (for session affinity if needed)
Same client IP always goes to same server
upstream backend {
ip_hash;
server agent1:8000;
server agent2:8000;
server agent3:8000;
}4. Weighted
Some servers handle more traffic
upstream backend {
server agent1:8000 weight=3; # Gets 3x traffic
server agent2:8000 weight=1;
server agent3:8000 weight=1;
}---
Scene 5: Caching Strategies (7:00-9:00)
[Visual]: Cache layers diagram [Animation]: Cache hits avoiding expensive operations
[Audio/Script]:
"The fastest request is one you don't have to process.>
Caching saves time and money, especially for AI workloads."
[Demo - Caching]:
import hashlib
import json
from typing import Optional
import redis.asyncio as redisclass AgentCache:
"""Multi-level caching for agent responses"""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
def _cache_key(self, task: str, context: dict) -> str:
"""Generate deterministic cache key"""
content = json.dumps({"task": task, "context": context}, sort_keys=True)
return f"agent:response:{hashlib.sha256(content.encode()).hexdigest()[:16]}"
async def get(self, task: str, context: dict) -> Optional[dict]:
"""Get cached response"""
key = self._cache_key(task, context)
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def set(self, task: str, context: dict, response: dict, ttl: int = 3600):
"""Cache a response"""
key = self._cache_key(task, context)
await self.redis.setex(key, ttl, json.dumps(response))
class CachedAgentService:
"""Agent service with caching"""
def __init__(self, agent, cache: AgentCache):
self.agent = agent
self.cache = cache
async def execute(self, task: str, context: dict, use_cache: bool = True) -> dict:
"""Execute task with caching"""
# Check cache first
if use_cache:
cached = await self.cache.get(task, context)
if cached:
cached["_cached"] = True
return cached
# Execute agent
result = await self.agent.execute(task, context)
# Cache the result
if use_cache and self._is_cacheable(task, result):
await self.cache.set(task, context, result)
result["_cached"] = False
return result
def _is_cacheable(self, task: str, result: dict) -> bool:
"""Determine if result should be cached"""
# Don't cache errors
if result.get("status") == "error":
return False
# Don't cache time-sensitive queries
time_words = ["now", "current", "today", "latest"]
if any(word in task.lower() for word in time_words):
return False
return True
Cache warming for common queries
async def warm_cache(cache: AgentCache, common_queries: list):
"""Pre-populate cache with common queries"""
for query in common_queries:
# Pre-compute and cache
result = await agent.execute(query["task"], query["context"])
await cache.set(query["task"], query["context"], result, ttl=86400)---
Scene 6: Rate Limiting & Throttling (9:00-11:00)
[Visual]: Traffic being controlled [Animation]: Requests being queued and throttled
[Audio/Script]:
"Protect your system from overload with rate limiting."
[Demo - Rate Limiting]:
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware import Middleware
import time
from collections import defaultdict
import asyncioclass RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, rate: int, capacity: int):
self.rate = rate # Tokens per second
self.capacity = capacity # Max bucket size
self.buckets = defaultdict(lambda: {"tokens": capacity, "last_update": time.time()})
self.lock = asyncio.Lock()
async def acquire(self, key: str, tokens: int = 1) -> bool:
"""Try to acquire tokens for a key"""
async with self.lock:
bucket = self.buckets[key]
now = time.time()
# Refill tokens based on time elapsed
elapsed = now - bucket["last_update"]
bucket["tokens"] = min(
self.capacity,
bucket["tokens"] + elapsed * self.rate
)
bucket["last_update"] = now
# Check if enough tokens
if bucket["tokens"] >= tokens:
bucket["tokens"] -= tokens
return True
return False
async def get_wait_time(self, key: str, tokens: int = 1) -> float:
"""Get time to wait for tokens"""
bucket = self.buckets[key]
if bucket["tokens"] >= tokens:
return 0
return (tokens - bucket["tokens"]) / self.rate
FastAPI middleware
class RateLimitMiddleware:
def __init__(self, app: FastAPI, limiter: RateLimiter):
self.app = app
self.limiter = limiter async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Get client identifier
request = Request(scope, receive)
client_id = self._get_client_id(request)
# Check rate limit
allowed = await self.limiter.acquire(client_id)
if not allowed:
wait_time = await self.limiter.get_wait_time(client_id)
response = JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"retry_after_seconds": wait_time
},
headers={"Retry-After": str(int(wait_time))}
)
await response(scope, receive, send)
return
await self.app(scope, receive, send)
def _get_client_id(self, request: Request) -> str:
"""Get unique client identifier"""
# Use API key if available
api_key = request.headers.get("X-API-Key")
if api_key:
return f"api:{api_key}"
# Fall back to IP
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return f"ip:{forwarded.split(',')[0].strip()}"
return f"ip:{request.client.host}"
Tiered rate limiting
class TieredRateLimiter:
"""Different limits for different tiers""" def __init__(self):
self.tiers = {
"free": RateLimiter(rate=1, capacity=10), # 1 req/sec, burst 10
"basic": RateLimiter(rate=10, capacity=50), # 10 req/sec, burst 50
"premium": RateLimiter(rate=100, capacity=200) # 100 req/sec, burst 200
}
async def acquire(self, api_key: str) -> bool:
tier = await self._get_tier(api_key)
return await self.tiers[tier].acquire(api_key)
async def _get_tier(self, api_key: str) -> str:
# Look up tier from database
return "basic" # Default
---
Scene 7: Performance Optimization (11:00-13:00)
[Visual]: Performance metrics improving [Animation]: Bottlenecks being eliminated
[Audio/Script]:
"Once scaled, optimize for performance."
[Demo - Performance Tips]:
1. Connection Pooling
import asyncpg
import httpxDatabase connection pool
db_pool = await asyncpg.create_pool(
dsn="postgresql://...",
min_size=5,
max_size=20,
command_timeout=60
)HTTP connection pool for API calls
http_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
timeout=httpx.Timeout(30.0)
)2. Async everywhere
async def process_batch(tasks: list):
"""Process multiple tasks concurrently"""
results = await asyncio.gather(
*[process_single(task) for task in tasks],
return_exceptions=True
)
return results3. Streaming responses for large outputs
from fastapi.responses import StreamingResponseasync def stream_response(task: str):
"""Stream AI response as it's generated"""
async def generate():
async for chunk in agent.stream(task):
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
4. Background tasks for non-critical work
from fastapi import BackgroundTasks@app.post("/api/v1/task")
async def execute_task(request: TaskRequest, background: BackgroundTasks):
result = await agent.execute(request.task, request.context)
# Non-critical: log analytics in background
background.add_task(log_analytics, request, result)
return result
5. Efficient serialization
import orjson # Faster than standard jsondef serialize(data: dict) -> bytes:
return orjson.dumps(data)
def deserialize(data: bytes) -> dict:
return orjson.loads(data)
---
Scene 8: Challenge Time (13:00-14:00)
[Visual]: Challenge specification [Animation]: XP reward display
[Audio/Script]:
"Your challenge: Make your agent service scalable.>
Requirements:
1. Stateless agent design with Redis for state
2. Nginx load balancer config for 3 replicas
3. Response caching system
4. Rate limiting middleware
5. Performance test showing horizontal scaling>
Complete this for 850 XP and the 'Scale Master' badge.>
Next: Monitoring and observability - seeing inside your production system."
---
Post-Video Challenge
Challenge ID: TRACK4_002_CHALLENGE Type: Code + Infrastructure Instructions:
Task 1: Implement stateless design
claude "Refactor the agent to be stateless:
1. Move conversation history to Redis
2. Move user preferences to PostgreSQL
3. No in-memory state between requests"Task 2: Create load balancer config
claude "Create Nginx configuration:
1. Upstream with 3 agent servers
2. Health check endpoints
3. Appropriate timeouts for AI workloads
4. SSL termination"Task 3: Add caching layer
claude "Implement response caching:
1. Cache key generation from task + context
2. TTL-based expiration
3. Cache bypass for time-sensitive queries
4. Cache warming for common queries"Task 4: Add rate limiting
claude "Add rate limiting:
1. Token bucket algorithm
2. Per-client rate limits
3. 429 responses with Retry-After header
4. Tiered limits based on API key"Task 5: Performance test
Test single instance
wrk -t4 -c100 -d30s http://localhost:8000/api/v1/taskTest with 3 instances behind load balancer
wrk -t4 -c300 -d30s http://localhost/api/v1/taskCompare throughput and latency
Rewards:
- XP: 850 (450 base + 400 challenge)
- Achievement: "Scale Master"
SEO Metadata
Alt-text: Scaling AI agents for production - horizontal scaling, load balancing, caching, rate limiting. Build high-performance AI services.
Tags: AI scaling, horizontal scaling, load balancing, caching, rate limiting, performance optimization, nginx
Keywords: scale ai agents, production ai performance, load balancing ai, ai caching strategies, rate limiting api, high performance ai