📚 Scaling & Performance

🎯 Level 47+ ⭐ 850 XP ⏱️ 14 min

[VIDEO-017] Scaling & Performance

Track: 4 - Production Mastery Module: 2 Duration: 14 minutes Level requirement: 47 XP reward: 450 XP

---

Scene 1: When Single Server Isn't Enough (0:00-1:30)

[Visual]: Single server overwhelmed vs distributed system [Animation]: Traffic spikes and system response

[Audio/Script]:

"Your AI service is live. Users love it. Traffic grows.
>
Then one day:
- Response times spike
- Requests start failing
- Your single server hits its limits
>
This is the scaling problem.
>
The solution isn't bigger servers. It's smarter architecture.
>
Let's learn to scale AI systems."

[Lower third]: "Track 4: Production Mastery | Level 47"

---

Scene 2: Vertical vs Horizontal Scaling (1:30-3:00)

[Visual]: Scaling comparison diagrams [Animation]: Server growing bigger vs multiplying

[Audio/Script]:

"Two approaches to scaling:
>
Vertical Scaling (Scale Up)
- Bigger machine: More CPU, RAM, GPU
- Simple but limited
- Single point of failure
>
Horizontal Scaling (Scale Out)
- More machines working together
- Nearly unlimited capacity
- Built-in redundancy
>
For AI systems, horizontal scaling is the path to production scale."

[Diagram]:

Vertical:                   Horizontal:
┌────────────┐             ┌──────┐ ┌──────┐ ┌──────┐
│            │             │Agent │ │Agent │ │Agent │
│  Big       │             │  1   │ │  2   │ │  3   │
│  Agent     │             └──────┘ └──────┘ └──────┘
│  Server    │                  │       │       │
│            │                  └───────┼───────┘
│            │                          │
└────────────┘                  ┌───────────────┐
                                │ Load Balancer │
Limit: Hardware max            └───────────────┘
                                Limit: Budget

---

Scene 3: Stateless Design (3:00-5:00)

[Visual]: Stateless vs stateful architecture [Animation]: Requests flowing to any server

[Audio/Script]:

"The key to horizontal scaling: Stateless agents.
>
Stateless: Any request can go to any server
Stateful: Requests must go to specific servers
>
Make your agents stateless. Store state externally."

[Demo - Stateless Design]:

BAD: Stateful agent (can't scale)

class StatefulAgent: def __init__(self): self.conversation_history = [] # State in memory! self.user_preferences = {} # State in memory!

async def process(self, message: str, user_id: str): self.conversation_history.append(message) # Lost if server restarts return await self._generate_response()

GOOD: Stateless agent (scales infinitely)

class StatelessAgent: def __init__(self, redis_client, db_pool): self.cache = redis_client # External state self.db = db_pool # External state

async def process(self, message: str, user_id: str, session_id: str): # Load state from external store history = await self._load_history(session_id) preferences = await self._load_preferences(user_id)

# Process response = await self._generate_response(message, history, preferences)

# Save state to external store await self._save_history(session_id, history + [message, response])

return response

async def _load_history(self, session_id: str) -> list: """Load conversation history from Redis""" data = await self.cache.get(f"history:{session_id}") return json.loads(data) if data else []

async def _save_history(self, session_id: str, history: list): """Save conversation history to Redis with TTL""" await self.cache.setex( f"history:{session_id}", 3600, # 1 hour TTL json.dumps(history) )

async def _load_preferences(self, user_id: str) -> dict: """Load user preferences from database""" async with self.db.acquire() as conn: row = await conn.fetchrow( "SELECT preferences FROM users WHERE id = $1", user_id ) return row["preferences"] if row else {}

---

Scene 4: Load Balancing (5:00-7:00)

[Visual]: Load balancer distributing traffic [Animation]: Requests being routed to different servers

[Audio/Script]:

"Load balancers distribute requests across your agents."

[Demo - Nginx Load Balancer]:

/etc/nginx/conf.d/agent-service.conf

upstream agent_backend { # Round-robin by default server agent1:8000 weight=1; server agent2:8000 weight=1; server agent3:8000 weight=1;

# Health checks keepalive 32; }

server { listen 80; server_name api.example.com;

# Redirect to HTTPS return 301 https://$server_name$request_uri; }

server { listen 443 ssl http2; server_name api.example.com;

ssl_certificate /etc/ssl/certs/agent.crt; ssl_certificate_key /etc/ssl/private/agent.key;

# Timeouts for AI workloads (longer than typical web) proxy_connect_timeout 10s; proxy_send_timeout 120s; proxy_read_timeout 120s;

location /api/ { proxy_pass http://agent_backend; proxy_http_version 1.1;

# Headers proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Request-ID $request_id;

# Keepalive proxy_set_header Connection "";

# Buffering for large responses proxy_buffering on; proxy_buffer_size 128k; proxy_buffers 4 256k; }

location /health { proxy_pass http://agent_backend/health; proxy_connect_timeout 5s; proxy_read_timeout 5s; } }

[Demo - Load Balancing Strategies]:

Different load balancing strategies

1. Round Robin (default)

Requests distributed equally to all servers

upstream backend { server agent1:8000; server agent2:8000; server agent3:8000; }

2. Least Connections

Request goes to server with fewest active connections

upstream backend { least_conn; server agent1:8000; server agent2:8000; server agent3:8000; }

3. IP Hash (for session affinity if needed)

Same client IP always goes to same server

upstream backend { ip_hash; server agent1:8000; server agent2:8000; server agent3:8000; }

4. Weighted

Some servers handle more traffic

upstream backend { server agent1:8000 weight=3; # Gets 3x traffic server agent2:8000 weight=1; server agent3:8000 weight=1; }

---

Scene 5: Caching Strategies (7:00-9:00)

[Visual]: Cache layers diagram [Animation]: Cache hits avoiding expensive operations

[Audio/Script]:

"The fastest request is one you don't have to process.
>
Caching saves time and money, especially for AI workloads."

[Demo - Caching]:

import hashlib
import json
from typing import Optional
import redis.asyncio as redis

class AgentCache: """Multi-level caching for agent responses"""

def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url)

def _cache_key(self, task: str, context: dict) -> str: """Generate deterministic cache key""" content = json.dumps({"task": task, "context": context}, sort_keys=True) return f"agent:response:{hashlib.sha256(content.encode()).hexdigest()[:16]}"

async def get(self, task: str, context: dict) -> Optional[dict]: """Get cached response""" key = self._cache_key(task, context) data = await self.redis.get(key) if data: return json.loads(data) return None

async def set(self, task: str, context: dict, response: dict, ttl: int = 3600): """Cache a response""" key = self._cache_key(task, context) await self.redis.setex(key, ttl, json.dumps(response))

class CachedAgentService: """Agent service with caching"""

def __init__(self, agent, cache: AgentCache): self.agent = agent self.cache = cache

async def execute(self, task: str, context: dict, use_cache: bool = True) -> dict: """Execute task with caching"""

# Check cache first if use_cache: cached = await self.cache.get(task, context) if cached: cached["_cached"] = True return cached

# Execute agent result = await self.agent.execute(task, context)

# Cache the result if use_cache and self._is_cacheable(task, result): await self.cache.set(task, context, result)

result["_cached"] = False return result

def _is_cacheable(self, task: str, result: dict) -> bool: """Determine if result should be cached""" # Don't cache errors if result.get("status") == "error": return False

# Don't cache time-sensitive queries time_words = ["now", "current", "today", "latest"] if any(word in task.lower() for word in time_words): return False

return True

Cache warming for common queries

async def warm_cache(cache: AgentCache, common_queries: list): """Pre-populate cache with common queries""" for query in common_queries: # Pre-compute and cache result = await agent.execute(query["task"], query["context"]) await cache.set(query["task"], query["context"], result, ttl=86400)

---

Scene 6: Rate Limiting & Throttling (9:00-11:00)

[Visual]: Traffic being controlled [Animation]: Requests being queued and throttled

[Audio/Script]:

"Protect your system from overload with rate limiting."

[Demo - Rate Limiting]:

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware import Middleware
import time
from collections import defaultdict
import asyncio

class RateLimiter: """Token bucket rate limiter"""

def __init__(self, rate: int, capacity: int): self.rate = rate # Tokens per second self.capacity = capacity # Max bucket size self.buckets = defaultdict(lambda: {"tokens": capacity, "last_update": time.time()}) self.lock = asyncio.Lock()

async def acquire(self, key: str, tokens: int = 1) -> bool: """Try to acquire tokens for a key""" async with self.lock: bucket = self.buckets[key] now = time.time()

# Refill tokens based on time elapsed elapsed = now - bucket["last_update"] bucket["tokens"] = min( self.capacity, bucket["tokens"] + elapsed * self.rate ) bucket["last_update"] = now

# Check if enough tokens if bucket["tokens"] >= tokens: bucket["tokens"] -= tokens return True return False

async def get_wait_time(self, key: str, tokens: int = 1) -> float: """Get time to wait for tokens""" bucket = self.buckets[key] if bucket["tokens"] >= tokens: return 0 return (tokens - bucket["tokens"]) / self.rate

FastAPI middleware

class RateLimitMiddleware: def __init__(self, app: FastAPI, limiter: RateLimiter): self.app = app self.limiter = limiter

async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return

# Get client identifier request = Request(scope, receive) client_id = self._get_client_id(request)

# Check rate limit allowed = await self.limiter.acquire(client_id) if not allowed: wait_time = await self.limiter.get_wait_time(client_id) response = JSONResponse( status_code=429, content={ "error": "Rate limit exceeded", "retry_after_seconds": wait_time }, headers={"Retry-After": str(int(wait_time))} ) await response(scope, receive, send) return

await self.app(scope, receive, send)

def _get_client_id(self, request: Request) -> str: """Get unique client identifier""" # Use API key if available api_key = request.headers.get("X-API-Key") if api_key: return f"api:{api_key}"

# Fall back to IP forwarded = request.headers.get("X-Forwarded-For") if forwarded: return f"ip:{forwarded.split(',')[0].strip()}" return f"ip:{request.client.host}"

Tiered rate limiting

class TieredRateLimiter: """Different limits for different tiers"""

def __init__(self): self.tiers = { "free": RateLimiter(rate=1, capacity=10), # 1 req/sec, burst 10 "basic": RateLimiter(rate=10, capacity=50), # 10 req/sec, burst 50 "premium": RateLimiter(rate=100, capacity=200) # 100 req/sec, burst 200 }

async def acquire(self, api_key: str) -> bool: tier = await self._get_tier(api_key) return await self.tiers[tier].acquire(api_key)

async def _get_tier(self, api_key: str) -> str: # Look up tier from database return "basic" # Default

---

Scene 7: Performance Optimization (11:00-13:00)

[Visual]: Performance metrics improving [Animation]: Bottlenecks being eliminated

[Audio/Script]:

"Once scaled, optimize for performance."

[Demo - Performance Tips]:

1. Connection Pooling

import asyncpg import httpx

Database connection pool

db_pool = await asyncpg.create_pool( dsn="postgresql://...", min_size=5, max_size=20, command_timeout=60 )

HTTP connection pool for API calls

http_client = httpx.AsyncClient( limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), timeout=httpx.Timeout(30.0) )

2. Async everywhere

async def process_batch(tasks: list): """Process multiple tasks concurrently""" results = await asyncio.gather( *[process_single(task) for task in tasks], return_exceptions=True ) return results

3. Streaming responses for large outputs

from fastapi.responses import StreamingResponse

async def stream_response(task: str): """Stream AI response as it's generated""" async def generate(): async for chunk in agent.stream(task): yield f"data: {json.dumps(chunk)}\n\n" yield "data: [DONE]\n\n"

return StreamingResponse( generate(), media_type="text/event-stream" )

4. Background tasks for non-critical work

from fastapi import BackgroundTasks

@app.post("/api/v1/task") async def execute_task(request: TaskRequest, background: BackgroundTasks): result = await agent.execute(request.task, request.context)

# Non-critical: log analytics in background background.add_task(log_analytics, request, result)

return result

5. Efficient serialization

import orjson # Faster than standard json

def serialize(data: dict) -> bytes: return orjson.dumps(data)

def deserialize(data: bytes) -> dict: return orjson.loads(data)

---

Scene 8: Challenge Time (13:00-14:00)

[Visual]: Challenge specification [Animation]: XP reward display

[Audio/Script]:

"Your challenge: Make your agent service scalable.
>
Requirements:
1. Stateless agent design with Redis for state
2. Nginx load balancer config for 3 replicas
3. Response caching system
4. Rate limiting middleware
5. Performance test showing horizontal scaling
>
Complete this for 850 XP and the 'Scale Master' badge.
>
Next: Monitoring and observability - seeing inside your production system."

---

Post-Video Challenge

Challenge ID: TRACK4_002_CHALLENGE Type: Code + Infrastructure Instructions:

Task 1: Implement stateless design

claude "Refactor the agent to be stateless:
1. Move conversation history to Redis
2. Move user preferences to PostgreSQL
3. No in-memory state between requests"

Task 2: Create load balancer config

claude "Create Nginx configuration:
1. Upstream with 3 agent servers
2. Health check endpoints
3. Appropriate timeouts for AI workloads
4. SSL termination"

Task 3: Add caching layer

claude "Implement response caching:
1. Cache key generation from task + context
2. TTL-based expiration
3. Cache bypass for time-sensitive queries
4. Cache warming for common queries"

Task 4: Add rate limiting

claude "Add rate limiting:
1. Token bucket algorithm
2. Per-client rate limits
3. 429 responses with Retry-After header
4. Tiered limits based on API key"

Task 5: Performance test

Test single instance

wrk -t4 -c100 -d30s http://localhost:8000/api/v1/task

Test with 3 instances behind load balancer

wrk -t4 -c300 -d30s http://localhost/api/v1/task

Compare throughput and latency

Rewards:

  • XP: 850 (450 base + 400 challenge)
  • Achievement: "Scale Master"
---

SEO Metadata

Alt-text: Scaling AI agents for production - horizontal scaling, load balancing, caching, rate limiting. Build high-performance AI services.

Tags: AI scaling, horizontal scaling, load balancing, caching, rate limiting, performance optimization, nginx

Keywords: scale ai agents, production ai performance, load balancing ai, ai caching strategies, rate limiting api, high performance ai

Naposledy změněno: středa, 10. prosince 2025, 01.05