LLM Deployment & Infrastructure Code Reference
Complete code implementations for Chapter 9: LLM Deployment & Infrastructure.
LLM Serving Stack Architecture
flowchart TB
subgraph Clients["CLIENT LAYER"]
direction LR
Web["Web App"]
Mobile["Mobile App"]
API["API Clients"]
end
subgraph Gateway["GATEWAY LAYER"]
direction TB
LB["Load Balancer<br/>(Nginx/HAProxy)"]
RateLimit["Rate Limiter"]
Auth["Auth/API Keys"]
LB --> RateLimit --> Auth
end
subgraph Inference["INFERENCE LAYER"]
direction TB
Router["Request Router"]
subgraph Pool["vLLM Pool"]
direction LR
GPU1["GPU 1<br/>Prefill"]
GPU2["GPU 2<br/>Decode"]
GPU3["GPU 3<br/>Decode"]
GPU4["GPU 4<br/>Decode"]
end
subgraph Memory["Memory Management"]
KVCache["Paged KV Cache"]
Prefixes["Prefix Cache"]
end
Router --> Pool
Pool --> Memory
end
subgraph Support["SUPPORT SYSTEMS"]
direction LR
Queue["Request Queue<br/>(Redis)"]
Metrics["Prometheus<br/>Metrics"]
Models["Model Store<br/>(S3/GCS)"]
end
Clients --> Gateway
Gateway --> Queue
Queue --> Inference
Inference --> Metrics
Models -.->|"Model Loading"| Pool
style Gateway fill:#e3f2fd
style Inference fill:#e8f5e9
style Support fill:#fff3e0
Request Flow Diagram
sequenceDiagram
participant C as Client
participant G as Gateway
participant Q as Queue
participant S as Scheduler
participant P as Prefill Worker
participant D as Decode Workers
participant K as KV Cache
C->>G: POST /v1/completions
G->>G: Rate limit + Auth
G->>Q: Enqueue request
Q->>S: Dequeue batch
S->>P: Schedule prefill
P->>K: Allocate KV blocks
P->>P: Process prompt tokens
P-->>D: Transfer to decode
loop Token Generation
D->>K: Read/Write KV
D->>D: Generate token
D-->>C: Stream token (SSE)
end
D->>K: Free KV blocks
D->>G: Complete response
Table of Contents
- Inference Fundamentals
- Batching Strategies
- Performance Profiling
- Memory Optimization
- Scaling and Load Balancing
- Production Monitoring
- Cost Engineering
Inference Fundamentals
Inference Metrics Tracker
Complete implementation for tracking inference performance metrics:
import numpy as np
class InferenceMetrics:
"""Track inference performance metrics."""
def __init__(self):
self.requests = []
def record_request(
self,
prompt_tokens: int,
generated_tokens: int,
ttft_ms: float,
total_time_ms: float
):
itl_ms = (total_time_ms - ttft_ms) / max(generated_tokens - 1, 1)
self.requests.append({
"prompt_tokens": prompt_tokens,
"generated_tokens": generated_tokens,
"ttft_ms": ttft_ms,
"itl_ms": itl_ms,
"total_time_ms": total_time_ms,
"tokens_per_second": generated_tokens / (total_time_ms / 1000)
})
def summary(self) -> dict:
if not self.requests:
return {}
return {
"request_count": len(self.requests),
"avg_ttft_ms": np.mean([r["ttft_ms"] for r in self.requests]),
"p50_ttft_ms": np.percentile([r["ttft_ms"] for r in self.requests], 50),
"p99_ttft_ms": np.percentile([r["ttft_ms"] for r in self.requests], 99),
"avg_itl_ms": np.mean([r["itl_ms"] for r in self.requests]),
"throughput_tokens_per_sec": np.mean([r["tokens_per_second"] for r in self.requests])
}Memory Estimation
Complete implementation for estimating LLM memory requirements:
def estimate_memory_gb(
num_parameters: float, # In billions
precision: str, # "fp32", "fp16", "int8", "int4"
num_layers: int,
hidden_dim: int,
num_heads: int,
head_dim: int,
batch_size: int,
seq_length: int
) -> dict:
"""Estimate memory requirements for LLM inference."""
bytes_per_param = {
"fp32": 4, "fp16": 2, "bf16": 2, "int8": 1, "int4": 0.5
}[precision]
# Model weights
weight_gb = num_parameters * 1e9 * bytes_per_param / 1e9
# KV cache (assume fp16 for cache even with int8 weights)
kv_cache_bytes = 2 * num_layers * seq_length * num_heads * head_dim * 2 * batch_size
kv_cache_gb = kv_cache_bytes / 1e9
# Activation memory (rough estimate)
activation_gb = batch_size * seq_length * hidden_dim * 4 / 1e9
return {
"weights_gb": weight_gb,
"kv_cache_gb": kv_cache_gb,
"activations_gb": activation_gb,
"total_gb": weight_gb + kv_cache_gb + activation_gb,
"per_request_gb": kv_cache_gb / batch_size
}
# Example: LLaMA-7B with batch of 8, 4K context {.unnumbered}
estimate_memory_gb(
num_parameters=7,
precision="fp16",
num_layers=32,
hidden_dim=4096,
num_heads=32,
head_dim=128,
batch_size=8,
seq_length=4096
)
# {'weights_gb': 14.0, 'kv_cache_gb': 16.0, 'activations_gb': 0.5, 'total_gb': 30.5}Batching Strategies
Static Batcher
Complete implementation for collecting requests into fixed-size batches:
import asyncio
class StaticBatcher:
"""Collect requests into fixed-size batches."""
def __init__(self, batch_size: int, timeout_ms: int = 100):
self.batch_size = batch_size
self.timeout_ms = timeout_ms
self.pending = []
self.lock = asyncio.Lock()
async def add_request(self, request: dict) -> asyncio.Future:
"""Add request to batch, return future for result."""
future = asyncio.Future()
async with self.lock:
self.pending.append((request, future))
if len(self.pending) >= self.batch_size:
await self._process_batch()
return future
async def _process_batch(self):
"""Process all pending requests as a batch."""
if not self.pending:
return
batch = self.pending[:self.batch_size]
self.pending = self.pending[self.batch_size:]
requests = [r for r, _ in batch]
futures = [f for _, f in batch]
# Process batch
results = await self.model.generate_batch(requests)
# Distribute results
for future, result in zip(futures, results):
future.set_result(result)Continuous Batcher
Complete implementation for dynamic request batching:
import asyncio
EOS_TOKEN = 2 # End-of-sequence token ID
class ContinuousBatcher:
"""Process requests with continuous batching."""
def __init__(self, max_batch_size: int):
self.max_batch_size = max_batch_size
self.active_requests = [] # Currently generating
self.waiting_queue = asyncio.Queue()
async def add_request(self, request: dict):
"""Add request to processing queue."""
await self.waiting_queue.put(request)
async def run(self):
"""Main processing loop with continuous batching."""
while True:
# Try to fill batch with waiting requests
while (len(self.active_requests) < self.max_batch_size
and not self.waiting_queue.empty()):
request = await self.waiting_queue.get()
self.active_requests.append({
"request": request,
"tokens": [],
"done": False
})
if not self.active_requests:
await asyncio.sleep(0.001) # No work, brief sleep
continue
# Generate one token for all active requests
next_tokens = await self._generate_step(self.active_requests)
# Update states, remove completed
for i, (req, token) in enumerate(zip(self.active_requests, next_tokens)):
req["tokens"].append(token)
if token == EOS_TOKEN or len(req["tokens"]) >= req["request"]["max_tokens"]:
req["done"] = True
req["request"]["callback"](req["tokens"])
self.active_requests = [r for r in self.active_requests if not r["done"]]Iteration-Level Scheduler
Complete implementation for fine-grained token-by-token scheduling:
EOS_TOKEN = 2 # End-of-sequence token ID
class IterationScheduler:
"""Schedule at iteration (token) level for maximum efficiency."""
def __init__(self, model, memory_manager):
self.model = model
self.memory = memory_manager
self.prefill_queue = []
self.decode_set = set()
async def schedule_iteration(self):
"""Select requests for this iteration."""
# Budget: available KV cache memory
available_blocks = self.memory.get_free_blocks()
# Priority 1: Requests in decode phase (don't interrupt)
scheduled_decode = list(self.decode_set)
used_blocks = sum(r.kv_blocks for r in scheduled_decode)
# Priority 2: New requests that fit (prefill)
scheduled_prefill = []
for request in self.prefill_queue:
blocks_needed = self._estimate_blocks(request)
if used_blocks + blocks_needed <= available_blocks:
scheduled_prefill.append(request)
used_blocks += blocks_needed
return scheduled_prefill, scheduled_decode
async def run_iteration(self):
"""Execute one iteration of the scheduler."""
prefill, decode = await self.schedule_iteration()
# Run prefill (compute-bound)
if prefill:
prefill_results = await self.model.prefill_batch(prefill)
for request, kv_cache in zip(prefill, prefill_results):
request.kv_cache = kv_cache
self.decode_set.add(request)
self.prefill_queue = [r for r in self.prefill_queue if r not in prefill]
# Run decode (memory-bound)
if decode:
decode_results = await self.model.decode_batch(decode)
for request, token in zip(decode, decode_results):
request.generated_tokens.append(token)
if token == EOS_TOKEN:
request.complete()
self.decode_set.remove(request)
self.memory.free(request.kv_cache)Optimal Batch Size Finder
Find the largest batch size that meets latency requirements:
import random
import time
import numpy as np
def find_optimal_batch_size(
model,
latency_sla_ms: float,
test_prompts: list[str]
) -> int:
"""Find largest batch size meeting latency SLA."""
for batch_size in [1, 2, 4, 8, 16, 32, 64]:
latencies = []
for _ in range(10):
batch = random.sample(test_prompts, min(batch_size, len(test_prompts)))
start = time.time()
model.generate_batch(batch)
latencies.append((time.time() - start) * 1000)
p99 = np.percentile(latencies, 99)
if p99 > latency_sla_ms:
return max(1, batch_size // 2)
return 64 # Max testedPerformance Profiling
Inference Profiler
Complete implementation for profiling LLM inference:
import torch
import time
from contextlib import contextmanager
EOS_TOKEN = 2 # End-of-sequence token ID
class InferenceProfiler:
"""Profile LLM inference at multiple levels."""
def __init__(self):
self.timings = {}
self.memory_snapshots = []
@contextmanager
def profile_phase(self, name: str):
"""Profile a named phase of inference."""
torch.cuda.synchronize()
start_time = time.perf_counter()
start_memory = torch.cuda.memory_allocated()
yield
torch.cuda.synchronize()
end_time = time.perf_counter()
end_memory = torch.cuda.memory_allocated()
self.timings[name] = end_time - start_time
self.memory_snapshots.append({
"phase": name,
"memory_delta_mb": (end_memory - start_memory) / 1e6,
"total_memory_mb": end_memory / 1e6
})
def profile_generation(self, model, prompt_tokens, max_new_tokens):
"""Profile complete generation."""
with self.profile_phase("prefill"):
kv_cache = model.prefill(prompt_tokens)
generated = []
for i in range(max_new_tokens):
with self.profile_phase(f"decode_{i}"):
token, kv_cache = model.decode_step(kv_cache)
generated.append(token)
if token == EOS_TOKEN:
break
return generated, self.get_report()
def get_report(self) -> dict:
"""Generate profiling report."""
total_time = sum(self.timings.values())
# Separate prefill and decode times
prefill_time = self.timings.get("prefill", 0)
decode_times = [v for k, v in self.timings.items() if k.startswith("decode_")]
total_decode_time = sum(decode_times)
avg_decode_time = sum(decode_times) / len(decode_times) if decode_times else 0
return {
"total_time_s": total_time,
"prefill_time_s": prefill_time,
"total_decode_time_s": total_decode_time,
"avg_token_time_ms": avg_decode_time * 1000,
"tokens_generated": len(decode_times),
"tokens_per_second": len(decode_times) / total_decode_time if total_decode_time > 0 else 0,
"time_breakdown": {
"prefill_pct": prefill_time / total_time * 100 if total_time > 0 else 0,
"decode_pct": total_decode_time / total_time * 100 if total_time > 0 else 0
},
"peak_memory_mb": max(s["total_memory_mb"] for s in self.memory_snapshots) if self.memory_snapshots else 0
}Memory Analysis
Analyze how memory scales with context length:
import torch
def analyze_memory_usage(model, context_lengths: list[int]):
"""Analyze how memory scales with context length."""
results = []
for ctx_len in context_lengths:
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
# Generate with this context length
prompt = torch.randint(0, 32000, (1, ctx_len))
model.generate(prompt, max_new_tokens=100)
peak_memory = torch.cuda.max_memory_allocated() / 1e9
results.append({
"context_length": ctx_len,
"peak_memory_gb": peak_memory
})
return results
# Example output: {.unnumbered}
# context_length | peak_memory_gb {.unnumbered}
# 512 | 15.2 {.unnumbered}
# 1024 | 16.1 {.unnumbered}
# 2048 | 17.8 {.unnumbered}
# 4096 | 21.3 {.unnumbered}
# 8192 | 28.4 {.unnumbered}PyTorch GPU Profiler
Profile using PyTorch profiler with CUDA events:
import torch.profiler
def profile_with_pytorch(model, inputs, output_path="./profile"):
"""Profile using PyTorch profiler with CUDA events."""
with torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
schedule=torch.profiler.schedule(wait=1, warmup=1, active=3, repeat=1),
on_trace_ready=torch.profiler.tensorboard_trace_handler(output_path),
record_shapes=True,
profile_memory=True,
with_stack=True
) as prof:
for step in range(5):
model.generate(inputs)
prof.step()
# Print summary
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=20))Benchmark Inference
Comprehensive inference benchmark:
import time
import numpy as np
import torch
def benchmark_inference(model, test_prompts: list[str], config: dict):
"""Comprehensive inference benchmark."""
results = {
"config": config,
"metrics": []
}
# Warmup
for _ in range(3):
model.generate(test_prompts[0], max_tokens=10)
# Benchmark
for prompt in test_prompts:
torch.cuda.synchronize()
start = time.perf_counter()
tokens = model.generate(prompt, **config)
first_token_time = model.get_first_token_time() # If available
torch.cuda.synchronize()
end = time.perf_counter()
results["metrics"].append({
"prompt_tokens": len(model.tokenize(prompt)),
"generated_tokens": len(tokens),
"total_time_s": end - start,
"ttft_ms": first_token_time * 1000 if first_token_time else None,
"tokens_per_second": len(tokens) / (end - start)
})
# Aggregate
results["summary"] = {
"avg_tokens_per_second": np.mean([m["tokens_per_second"] for m in results["metrics"]]),
"p50_total_time_s": np.percentile([m["total_time_s"] for m in results["metrics"]], 50),
"p99_total_time_s": np.percentile([m["total_time_s"] for m in results["metrics"]], 99),
}
return resultsMemory Optimization
KV Cache Manager
Complete implementation for paged KV cache management:
import torch
class KVCacheManager:
"""Manage KV cache with paging for efficiency."""
def __init__(
self,
num_layers: int,
num_heads: int,
head_dim: int,
block_size: int = 16,
dtype=torch.float16
):
self.num_layers = num_layers
self.num_heads = num_heads
self.head_dim = head_dim
self.block_size = block_size
self.dtype = dtype
# Pre-allocate block pool
self.max_blocks = 1000
self.block_pool = torch.zeros(
self.max_blocks,
2, # K and V
num_layers,
block_size,
num_heads,
head_dim,
dtype=dtype,
device="cuda"
)
self.free_blocks = set(range(self.max_blocks))
self.allocated = {} # request_id -> list of block indices
def allocate(self, request_id: str, num_tokens: int) -> list[int]:
"""Allocate blocks for a request."""
num_blocks = (num_tokens + self.block_size - 1) // self.block_size
if len(self.free_blocks) < num_blocks:
raise MemoryError("Not enough KV cache blocks")
blocks = []
for _ in range(num_blocks):
block_id = self.free_blocks.pop()
blocks.append(block_id)
self.allocated[request_id] = blocks
return blocks
def free(self, request_id: str):
"""Free blocks for a completed request."""
if request_id in self.allocated:
for block_id in self.allocated[request_id]:
self.free_blocks.add(block_id)
del self.allocated[request_id]
def get_utilization(self) -> float:
"""Get KV cache memory utilization."""
used = self.max_blocks - len(self.free_blocks)
return used / self.max_blocksSpeculative Decoder
Complete implementation for speculative decoding:
EOS_TOKEN = 2 # End-of-sequence token ID
class SpeculativeDecoder:
"""Speculative decoding for faster generation."""
def __init__(self, target_model, draft_model, n_draft: int = 4):
self.target = target_model
self.draft = draft_model
self.n_draft = n_draft
async def generate(self, prompt_tokens: list[int], max_tokens: int) -> list[int]:
"""Generate with speculative decoding."""
generated = []
kv_cache_target = None
kv_cache_draft = None
while len(generated) < max_tokens:
# Step 1: Draft model generates n candidates
draft_tokens = []
for _ in range(self.n_draft):
context = prompt_tokens + generated + draft_tokens
next_token, kv_cache_draft = self.draft.decode_step(context, kv_cache_draft)
draft_tokens.append(next_token)
if next_token == EOS_TOKEN:
break
# Step 2: Target model verifies all in parallel
context = prompt_tokens + generated
logits, kv_cache_target = self.target.forward_batch(
context + draft_tokens,
kv_cache_target
)
# Step 3: Accept matching tokens
accepted = 0
for i, draft_token in enumerate(draft_tokens):
target_token = logits[len(context) + i].argmax()
if draft_token == target_token:
accepted += 1
generated.append(draft_token)
else:
# Reject: use target's token instead
generated.append(target_token)
break
# Check for EOS
if generated[-1] == EOS_TOKEN:
break
# If we rejected early, roll back draft model's KV cache
if accepted < len(draft_tokens):
kv_cache_draft = self._truncate_cache(kv_cache_draft, len(context) + accepted)
return generated
def _truncate_cache(self, cache, length):
"""Truncate KV cache to given length."""
return cache[:, :, :length, :]Prefix Cache
Complete implementation for KV cache prefix sharing:
import hashlib
import time
import torch
class PrefixCache:
"""Cache KV states for common prompt prefixes."""
def __init__(self, max_entries: int = 100):
self.cache = {} # hash -> (prefix_tokens, kv_cache)
self.max_entries = max_entries
self.access_times = {}
def get(self, prompt_tokens: list[int]) -> tuple[int, torch.Tensor]:
"""Find longest cached prefix."""
best_match_len = 0
best_cache = None
for hash_key, (prefix_tokens, kv_cache) in self.cache.items():
if self._is_prefix(prefix_tokens, prompt_tokens):
if len(prefix_tokens) > best_match_len:
best_match_len = len(prefix_tokens)
best_cache = kv_cache
self.access_times[hash_key] = time.time()
return best_match_len, best_cache
def put(self, prefix_tokens: list[int], kv_cache: torch.Tensor):
"""Store a prefix's KV cache."""
# Evict LRU if full
if len(self.cache) >= self.max_entries:
lru_key = min(self.access_times, key=self.access_times.get)
del self.cache[lru_key]
del self.access_times[lru_key]
hash_key = self._hash(prefix_tokens)
self.cache[hash_key] = (prefix_tokens, kv_cache.clone())
self.access_times[hash_key] = time.time()
def _is_prefix(self, prefix: list[int], full: list[int]) -> bool:
return len(prefix) <= len(full) and prefix == full[:len(prefix)]
def _hash(self, tokens: list[int]) -> str:
return hashlib.md5(str(tokens).encode()).hexdigest()
EOS_TOKEN = 2 # End-of-sequence token ID
class CachedInference:
"""Inference with prefix caching."""
def __init__(self, model, prefix_cache: PrefixCache):
self.model = model
self.prefix_cache = prefix_cache
async def generate(self, prompt_tokens: list[int], max_tokens: int) -> list[int]:
# Check for cached prefix
match_len, cached_kv = self.prefix_cache.get(prompt_tokens)
if cached_kv is not None:
# Use cached KV, only process new tokens
new_tokens = prompt_tokens[match_len:]
kv_cache = cached_kv
# Prefill only the new tokens
kv_cache = self.model.prefill(new_tokens, kv_cache)
else:
# Full prefill
kv_cache = self.model.prefill(prompt_tokens)
# Decode
generated = []
for _ in range(max_tokens):
token, kv_cache = self.model.decode_step(kv_cache)
generated.append(token)
if token == EOS_TOKEN:
break
# Consider caching this prefix for future
if len(prompt_tokens) > 100: # Only cache substantial prefixes
self.prefix_cache.put(prompt_tokens, kv_cache)
return generatedScaling and Load Balancing
LLM Load Balancer
Complete implementation for load balancing across inference servers:
class LLMLoadBalancer:
"""Load balance across multiple inference servers."""
def __init__(self, endpoints: list[str]):
self.endpoints = endpoints
self.request_counts = {ep: 0 for ep in endpoints}
self.health_status = {ep: True for ep in endpoints}
async def route_request(self, request: dict) -> dict:
"""Route request to least-loaded healthy endpoint."""
healthy = [ep for ep in self.endpoints if self.health_status[ep]]
if not healthy:
raise Exception("No healthy endpoints")
# Least-connections routing
endpoint = min(healthy, key=lambda ep: self.request_counts[ep])
self.request_counts[endpoint] += 1
try:
result = await self._send_request(endpoint, request)
return result
finally:
self.request_counts[endpoint] -= 1
async def health_check(self):
"""Periodically check endpoint health."""
for endpoint in self.endpoints:
try:
await self._ping(endpoint)
self.health_status[endpoint] = True
except Exception:
self.health_status[endpoint] = FalseKubernetes Deployment
Complete Kubernetes manifests for LLM inference:
# deployment.yaml {.unnumbered}
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-inference
spec:
replicas: 3
selector:
matchLabels:
app: llm-inference
template:
metadata:
labels:
app: llm-inference
spec:
containers:
- name: vllm
image: vllm/vllm-openai:latest
args:
- --model=meta-llama/Llama-2-7b-hf
- --gpu-memory-utilization=0.9
resources:
limits:
nvidia.com/gpu: 1
ports:
- containerPort: 8000
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 120 # Model loading time
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: llm-inference
spec:
selector:
app: llm-inference
ports:
- port: 8000
targetPort: 8000
type: LoadBalancer
---
# Kubernetes HPA for GPU-based autoscaling {.unnumbered}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-inference-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-inference
minReplicas: 1
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: queue_depth
target:
type: AverageValue
averageValue: "5" # Scale up when queue > 5Production Monitoring
Docker Compose Stack
Complete Docker Compose for inference with monitoring:
# docker-compose.yml {.unnumbered}
version: '3.8'
services:
vllm:
image: vllm/vllm-openai:latest
command: >
--model meta-llama/Llama-2-7b-hf
--gpu-memory-utilization 0.9
--max-model-len 4096
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
ports:
- "8000:8000"
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana
ports:
- "3000:3000"Prometheus Metrics Exporter
Complete implementation for custom LLM metrics:
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Define metrics {.unnumbered}
request_counter = Counter('llm_requests_total', 'Total LLM requests', ['model', 'status'])
latency_histogram = Histogram('llm_latency_seconds', 'Request latency', ['model', 'phase'])
tokens_generated = Counter('llm_tokens_generated_total', 'Total tokens generated', ['model'])
active_requests = Gauge('llm_active_requests', 'Currently processing requests', ['model'])
gpu_memory_used = Gauge('llm_gpu_memory_bytes', 'GPU memory used', ['gpu_id'])
class MetricsMiddleware:
def __init__(self, model_name: str):
self.model_name = model_name
async def __call__(self, request, call_next):
active_requests.labels(model=self.model_name).inc()
start_time = time.time()
try:
response = await call_next(request)
request_counter.labels(model=self.model_name, status="success").inc()
return response
except Exception as e:
request_counter.labels(model=self.model_name, status="error").inc()
raise
finally:
active_requests.labels(model=self.model_name).dec()
latency_histogram.labels(model=self.model_name, phase="total").observe(time.time() - start_time)Load Shedder
Complete implementation for overload protection:
import random
import time
class LoadShedder:
"""Implement load shedding for overload protection."""
def __init__(
self,
max_queue_depth: int = 100,
max_latency_ms: float = 5000,
shed_probability_base: float = 0.1
):
self.max_queue_depth = max_queue_depth
self.max_latency_ms = max_latency_ms
self.shed_probability_base = shed_probability_base
self.current_queue_depth = 0
self.recent_latencies = []
def should_accept(self, request: dict) -> tuple[bool, str]:
"""Decide whether to accept or shed a request."""
# Calculate load factor
queue_load = self.current_queue_depth / self.max_queue_depth
latency_load = self._get_avg_latency() / self.max_latency_ms
load_factor = max(queue_load, latency_load)
if load_factor < 0.7:
return True, "accepted"
if load_factor > 1.0:
return False, "queue_full"
# Probabilistic shedding in the 0.7-1.0 range
shed_probability = self.shed_probability_base * (load_factor - 0.7) / 0.3
if random.random() < shed_probability:
return False, "shed_probabilistic"
return True, "accepted_under_load"
def _get_avg_latency(self) -> float:
if not self.recent_latencies:
return 0
return sum(self.recent_latencies[-100:]) / min(len(self.recent_latencies), 100)
async def process_request(self, request: dict, handler: callable) -> dict:
"""Process request with load shedding."""
accept, reason = self.should_accept(request)
if not accept:
return {
"error": "Service overloaded",
"reason": reason,
"retry_after_seconds": 5
}
self.current_queue_depth += 1
start_time = time.time()
try:
result = await handler(request)
latency = (time.time() - start_time) * 1000
self.recent_latencies.append(latency)
return result
finally:
self.current_queue_depth -= 1Graceful Server
Complete implementation for graceful shutdown handling:
import signal
import asyncio
class GracefulServer:
"""Server with graceful shutdown handling."""
def __init__(self, model, host: str = "0.0.0.0", port: int = 8000):
self.model = model
self.host = host
self.port = port
self.active_requests = set()
self.shutdown_event = asyncio.Event()
self.accepting_requests = True
async def start(self):
"""Start server with signal handling."""
# Set up signal handlers
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(self.shutdown())
)
# Start serving
server = await asyncio.start_server(
self.handle_connection,
self.host,
self.port
)
print(f"Server started on {self.host}:{self.port}")
# Wait for shutdown
await self.shutdown_event.wait()
# Stop accepting new connections
server.close()
await server.wait_closed()
# Wait for active requests to complete
if self.active_requests:
print(f"Waiting for {len(self.active_requests)} active requests...")
await asyncio.gather(*self.active_requests, return_exceptions=True)
print("Shutdown complete")
async def shutdown(self, timeout: int = 30):
"""Initiate graceful shutdown."""
print("Shutdown initiated...")
self.accepting_requests = False
# Give active requests time to complete
try:
async with asyncio.timeout(timeout):
while self.active_requests:
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
print(f"Timeout: {len(self.active_requests)} requests still active")
self.shutdown_event.set()
async def handle_connection(self, reader, writer):
"""Handle incoming connection."""
if not self.accepting_requests:
writer.write(b"HTTP/1.1 503 Service Unavailable\r\n\r\n")
await writer.drain()
writer.close()
return
request_task = asyncio.current_task()
self.active_requests.add(request_task)
try:
await self.process_request(reader, writer)
finally:
self.active_requests.discard(request_task)Load Test
Complete implementation for load testing inference endpoints:
import asyncio
import aiohttp
import time
import random
import numpy as np
async def load_test(
endpoint: str,
prompts: list[str],
concurrency: int,
duration_seconds: int
) -> dict:
"""Load test the inference endpoint."""
results = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"latencies_ms": []
}
start_time = time.time()
async def worker():
async with aiohttp.ClientSession() as session:
while time.time() - start_time < duration_seconds:
prompt = random.choice(prompts)
request_start = time.time()
try:
async with session.post(
f"{endpoint}/v1/completions",
json={"prompt": prompt, "max_tokens": 50}
) as response:
results["total_requests"] += 1
if response.status == 200:
results["successful_requests"] += 1
latency = (time.time() - request_start) * 1000
results["latencies_ms"].append(latency)
else:
results["failed_requests"] += 1
except Exception:
results["failed_requests"] += 1
results["total_requests"] += 1
# Run concurrent workers
await asyncio.gather(*[worker() for _ in range(concurrency)])
# Calculate statistics
if results["latencies_ms"]:
results["p50_latency_ms"] = np.percentile(results["latencies_ms"], 50)
results["p99_latency_ms"] = np.percentile(results["latencies_ms"], 99)
results["throughput_rps"] = results["successful_requests"] / duration_seconds
return results
# Run test {.unnumbered}
# results = asyncio.run(load_test( {.unnumbered}
# endpoint="http://localhost:8000", {.unnumbered}
# prompts=["Tell me a story about " for _ in range(100)], {.unnumbered}
# concurrency=32, {.unnumbered}
# duration_seconds=60 {.unnumbered}
# )) {.unnumbered}Cost Engineering
Cost Estimator
Complete implementation for estimating and tracking inference costs:
class CostEstimator:
"""Estimate and track inference costs."""
# GPU costs per hour (approximate, varies by provider)
GPU_COSTS = {
"A10": 1.50,
"A100_40GB": 4.00,
"A100_80GB": 6.00,
"H100": 12.00,
"RTX_4090": 0.80, # Cloud gaming instances
}
def __init__(self, gpu_type: str, tokens_per_second: float):
self.gpu_cost_per_hour = self.GPU_COSTS.get(gpu_type, 5.0)
self.tokens_per_second = tokens_per_second
def cost_per_token(self) -> float:
"""Calculate cost per generated token."""
tokens_per_hour = self.tokens_per_second * 3600
return self.gpu_cost_per_hour / tokens_per_hour
def cost_per_request(self, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for a request."""
# Prefill is faster than decode
prefill_time = input_tokens / (self.tokens_per_second * 10) # ~10x faster
decode_time = output_tokens / self.tokens_per_second
total_time_hours = (prefill_time + decode_time) / 3600
return total_time_hours * self.gpu_cost_per_hour
def break_even_vs_api(
self,
api_cost_per_1k_input: float,
api_cost_per_1k_output: float,
requests_per_month: int,
avg_input_tokens: int,
avg_output_tokens: int
) -> dict:
"""Calculate break-even for self-hosting vs API."""
# API cost
api_cost_monthly = requests_per_month * (
(avg_input_tokens / 1000) * api_cost_per_1k_input +
(avg_output_tokens / 1000) * api_cost_per_1k_output
)
# Self-hosted cost (assume 24/7 operation)
self_hosted_cost_monthly = self.gpu_cost_per_hour * 24 * 30
# Capacity: can we handle the load?
seconds_per_month = 30 * 24 * 3600
max_output_tokens_monthly = self.tokens_per_second * seconds_per_month
max_requests_monthly = max_output_tokens_monthly / avg_output_tokens * 0.7 # 70% utilization
return {
"api_cost_monthly": api_cost_monthly,
"self_hosted_cost_monthly": self_hosted_cost_monthly,
"savings_monthly": api_cost_monthly - self_hosted_cost_monthly,
"savings_pct": (api_cost_monthly - self_hosted_cost_monthly) / api_cost_monthly * 100,
"max_requests_supported": max_requests_monthly,
"can_handle_load": max_requests_monthly >= requests_per_month,
"break_even_requests": self_hosted_cost_monthly / (
(avg_input_tokens / 1000) * api_cost_per_1k_input +
(avg_output_tokens / 1000) * api_cost_per_1k_output
)
}Cost Optimization
Recommend cost-optimized deployment configuration:
def optimize_for_cost(
model_size: str,
expected_requests_per_hour: int,
latency_sla_ms: float
) -> dict:
"""Recommend cost-optimized deployment configuration."""
configurations = [
{"gpu": "A10", "precision": "int4", "batch_size": 32},
{"gpu": "A10", "precision": "fp16", "batch_size": 16},
{"gpu": "A100_40GB", "precision": "int4", "batch_size": 64},
{"gpu": "A100_40GB", "precision": "fp16", "batch_size": 32},
{"gpu": "A100_80GB", "precision": "fp16", "batch_size": 64},
]
viable = []
for config in configurations:
throughput = estimate_throughput(model_size, config)
latency = estimate_latency(model_size, config)
if latency <= latency_sla_ms and throughput >= expected_requests_per_hour:
cost = CostEstimator.GPU_COSTS[config["gpu"]]
cost_per_request = cost / throughput
viable.append({
**config,
"throughput": throughput,
"latency": latency,
"cost_per_1k_requests": cost_per_request * 1000
})
# Sort by cost
viable.sort(key=lambda x: x["cost_per_1k_requests"])
return viable[0] if viable else None