LLM Deployment & Infrastructure Code Reference

Complete code implementations for Chapter 9: LLM Deployment & Infrastructure.


LLM Serving Stack Architecture

flowchart TB
    subgraph Clients["CLIENT LAYER"]
        direction LR
        Web["Web App"]
        Mobile["Mobile App"]
        API["API Clients"]
    end

    subgraph Gateway["GATEWAY LAYER"]
        direction TB
        LB["Load Balancer<br/>(Nginx/HAProxy)"]
        RateLimit["Rate Limiter"]
        Auth["Auth/API Keys"]
        LB --> RateLimit --> Auth
    end

    subgraph Inference["INFERENCE LAYER"]
        direction TB
        Router["Request Router"]

        subgraph Pool["vLLM Pool"]
            direction LR
            GPU1["GPU 1<br/>Prefill"]
            GPU2["GPU 2<br/>Decode"]
            GPU3["GPU 3<br/>Decode"]
            GPU4["GPU 4<br/>Decode"]
        end

        subgraph Memory["Memory Management"]
            KVCache["Paged KV Cache"]
            Prefixes["Prefix Cache"]
        end

        Router --> Pool
        Pool --> Memory
    end

    subgraph Support["SUPPORT SYSTEMS"]
        direction LR
        Queue["Request Queue<br/>(Redis)"]
        Metrics["Prometheus<br/>Metrics"]
        Models["Model Store<br/>(S3/GCS)"]
    end

    Clients --> Gateway
    Gateway --> Queue
    Queue --> Inference
    Inference --> Metrics
    Models -.->|"Model Loading"| Pool

    style Gateway fill:#e3f2fd
    style Inference fill:#e8f5e9
    style Support fill:#fff3e0

Request Flow Diagram

sequenceDiagram
    participant C as Client
    participant G as Gateway
    participant Q as Queue
    participant S as Scheduler
    participant P as Prefill Worker
    participant D as Decode Workers
    participant K as KV Cache

    C->>G: POST /v1/completions
    G->>G: Rate limit + Auth
    G->>Q: Enqueue request
    Q->>S: Dequeue batch

    S->>P: Schedule prefill
    P->>K: Allocate KV blocks
    P->>P: Process prompt tokens
    P-->>D: Transfer to decode

    loop Token Generation
        D->>K: Read/Write KV
        D->>D: Generate token
        D-->>C: Stream token (SSE)
    end

    D->>K: Free KV blocks
    D->>G: Complete response

Table of Contents


Inference Fundamentals

Inference Metrics Tracker

Complete implementation for tracking inference performance metrics:

import numpy as np

class InferenceMetrics:
    """Track inference performance metrics."""

    def __init__(self):
        self.requests = []

    def record_request(
        self,
        prompt_tokens: int,
        generated_tokens: int,
        ttft_ms: float,
        total_time_ms: float
    ):
        itl_ms = (total_time_ms - ttft_ms) / max(generated_tokens - 1, 1)

        self.requests.append({
            "prompt_tokens": prompt_tokens,
            "generated_tokens": generated_tokens,
            "ttft_ms": ttft_ms,
            "itl_ms": itl_ms,
            "total_time_ms": total_time_ms,
            "tokens_per_second": generated_tokens / (total_time_ms / 1000)
        })

    def summary(self) -> dict:
        if not self.requests:
            return {}

        return {
            "request_count": len(self.requests),
            "avg_ttft_ms": np.mean([r["ttft_ms"] for r in self.requests]),
            "p50_ttft_ms": np.percentile([r["ttft_ms"] for r in self.requests], 50),
            "p99_ttft_ms": np.percentile([r["ttft_ms"] for r in self.requests], 99),
            "avg_itl_ms": np.mean([r["itl_ms"] for r in self.requests]),
            "throughput_tokens_per_sec": np.mean([r["tokens_per_second"] for r in self.requests])
        }

Memory Estimation

Complete implementation for estimating LLM memory requirements:

def estimate_memory_gb(
    num_parameters: float,  # In billions
    precision: str,  # "fp32", "fp16", "int8", "int4"
    num_layers: int,
    hidden_dim: int,
    num_heads: int,
    head_dim: int,
    batch_size: int,
    seq_length: int
) -> dict:
    """Estimate memory requirements for LLM inference."""

    bytes_per_param = {
        "fp32": 4, "fp16": 2, "bf16": 2, "int8": 1, "int4": 0.5
    }[precision]

    # Model weights
    weight_gb = num_parameters * 1e9 * bytes_per_param / 1e9

    # KV cache (assume fp16 for cache even with int8 weights)
    kv_cache_bytes = 2 * num_layers * seq_length * num_heads * head_dim * 2 * batch_size
    kv_cache_gb = kv_cache_bytes / 1e9

    # Activation memory (rough estimate)
    activation_gb = batch_size * seq_length * hidden_dim * 4 / 1e9

    return {
        "weights_gb": weight_gb,
        "kv_cache_gb": kv_cache_gb,
        "activations_gb": activation_gb,
        "total_gb": weight_gb + kv_cache_gb + activation_gb,
        "per_request_gb": kv_cache_gb / batch_size
    }

# Example: LLaMA-7B with batch of 8, 4K context {.unnumbered}
estimate_memory_gb(
    num_parameters=7,
    precision="fp16",
    num_layers=32,
    hidden_dim=4096,
    num_heads=32,
    head_dim=128,
    batch_size=8,
    seq_length=4096
)
# {'weights_gb': 14.0, 'kv_cache_gb': 16.0, 'activations_gb': 0.5, 'total_gb': 30.5}

Batching Strategies

Static Batcher

Complete implementation for collecting requests into fixed-size batches:

import asyncio

class StaticBatcher:
    """Collect requests into fixed-size batches."""

    def __init__(self, batch_size: int, timeout_ms: int = 100):
        self.batch_size = batch_size
        self.timeout_ms = timeout_ms
        self.pending = []
        self.lock = asyncio.Lock()

    async def add_request(self, request: dict) -> asyncio.Future:
        """Add request to batch, return future for result."""
        future = asyncio.Future()

        async with self.lock:
            self.pending.append((request, future))

            if len(self.pending) >= self.batch_size:
                await self._process_batch()

        return future

    async def _process_batch(self):
        """Process all pending requests as a batch."""
        if not self.pending:
            return

        batch = self.pending[:self.batch_size]
        self.pending = self.pending[self.batch_size:]

        requests = [r for r, _ in batch]
        futures = [f for _, f in batch]

        # Process batch
        results = await self.model.generate_batch(requests)

        # Distribute results
        for future, result in zip(futures, results):
            future.set_result(result)

Continuous Batcher

Complete implementation for dynamic request batching:

import asyncio

EOS_TOKEN = 2  # End-of-sequence token ID

class ContinuousBatcher:
    """Process requests with continuous batching."""

    def __init__(self, max_batch_size: int):
        self.max_batch_size = max_batch_size
        self.active_requests = []  # Currently generating
        self.waiting_queue = asyncio.Queue()

    async def add_request(self, request: dict):
        """Add request to processing queue."""
        await self.waiting_queue.put(request)

    async def run(self):
        """Main processing loop with continuous batching."""
        while True:
            # Try to fill batch with waiting requests
            while (len(self.active_requests) < self.max_batch_size
                   and not self.waiting_queue.empty()):
                request = await self.waiting_queue.get()
                self.active_requests.append({
                    "request": request,
                    "tokens": [],
                    "done": False
                })

            if not self.active_requests:
                await asyncio.sleep(0.001)  # No work, brief sleep
                continue

            # Generate one token for all active requests
            next_tokens = await self._generate_step(self.active_requests)

            # Update states, remove completed
            for i, (req, token) in enumerate(zip(self.active_requests, next_tokens)):
                req["tokens"].append(token)
                if token == EOS_TOKEN or len(req["tokens"]) >= req["request"]["max_tokens"]:
                    req["done"] = True
                    req["request"]["callback"](req["tokens"])

            self.active_requests = [r for r in self.active_requests if not r["done"]]

Iteration-Level Scheduler

Complete implementation for fine-grained token-by-token scheduling:

EOS_TOKEN = 2  # End-of-sequence token ID

class IterationScheduler:
    """Schedule at iteration (token) level for maximum efficiency."""

    def __init__(self, model, memory_manager):
        self.model = model
        self.memory = memory_manager
        self.prefill_queue = []
        self.decode_set = set()

    async def schedule_iteration(self):
        """Select requests for this iteration."""
        # Budget: available KV cache memory
        available_blocks = self.memory.get_free_blocks()

        # Priority 1: Requests in decode phase (don't interrupt)
        scheduled_decode = list(self.decode_set)
        used_blocks = sum(r.kv_blocks for r in scheduled_decode)

        # Priority 2: New requests that fit (prefill)
        scheduled_prefill = []
        for request in self.prefill_queue:
            blocks_needed = self._estimate_blocks(request)
            if used_blocks + blocks_needed <= available_blocks:
                scheduled_prefill.append(request)
                used_blocks += blocks_needed

        return scheduled_prefill, scheduled_decode

    async def run_iteration(self):
        """Execute one iteration of the scheduler."""
        prefill, decode = await self.schedule_iteration()

        # Run prefill (compute-bound)
        if prefill:
            prefill_results = await self.model.prefill_batch(prefill)
            for request, kv_cache in zip(prefill, prefill_results):
                request.kv_cache = kv_cache
                self.decode_set.add(request)
            self.prefill_queue = [r for r in self.prefill_queue if r not in prefill]

        # Run decode (memory-bound)
        if decode:
            decode_results = await self.model.decode_batch(decode)
            for request, token in zip(decode, decode_results):
                request.generated_tokens.append(token)
                if token == EOS_TOKEN:
                    request.complete()
                    self.decode_set.remove(request)
                    self.memory.free(request.kv_cache)

Optimal Batch Size Finder

Find the largest batch size that meets latency requirements:

import random
import time
import numpy as np

def find_optimal_batch_size(
    model,
    latency_sla_ms: float,
    test_prompts: list[str]
) -> int:
    """Find largest batch size meeting latency SLA."""
    for batch_size in [1, 2, 4, 8, 16, 32, 64]:
        latencies = []
        for _ in range(10):
            batch = random.sample(test_prompts, min(batch_size, len(test_prompts)))
            start = time.time()
            model.generate_batch(batch)
            latencies.append((time.time() - start) * 1000)

        p99 = np.percentile(latencies, 99)
        if p99 > latency_sla_ms:
            return max(1, batch_size // 2)

    return 64  # Max tested

Performance Profiling

Inference Profiler

Complete implementation for profiling LLM inference:

import torch
import time
from contextlib import contextmanager

EOS_TOKEN = 2  # End-of-sequence token ID

class InferenceProfiler:
    """Profile LLM inference at multiple levels."""

    def __init__(self):
        self.timings = {}
        self.memory_snapshots = []

    @contextmanager
    def profile_phase(self, name: str):
        """Profile a named phase of inference."""
        torch.cuda.synchronize()
        start_time = time.perf_counter()
        start_memory = torch.cuda.memory_allocated()

        yield

        torch.cuda.synchronize()
        end_time = time.perf_counter()
        end_memory = torch.cuda.memory_allocated()

        self.timings[name] = end_time - start_time
        self.memory_snapshots.append({
            "phase": name,
            "memory_delta_mb": (end_memory - start_memory) / 1e6,
            "total_memory_mb": end_memory / 1e6
        })

    def profile_generation(self, model, prompt_tokens, max_new_tokens):
        """Profile complete generation."""
        with self.profile_phase("prefill"):
            kv_cache = model.prefill(prompt_tokens)

        generated = []
        for i in range(max_new_tokens):
            with self.profile_phase(f"decode_{i}"):
                token, kv_cache = model.decode_step(kv_cache)
                generated.append(token)
                if token == EOS_TOKEN:
                    break

        return generated, self.get_report()

    def get_report(self) -> dict:
        """Generate profiling report."""
        total_time = sum(self.timings.values())

        # Separate prefill and decode times
        prefill_time = self.timings.get("prefill", 0)
        decode_times = [v for k, v in self.timings.items() if k.startswith("decode_")]
        total_decode_time = sum(decode_times)
        avg_decode_time = sum(decode_times) / len(decode_times) if decode_times else 0

        return {
            "total_time_s": total_time,
            "prefill_time_s": prefill_time,
            "total_decode_time_s": total_decode_time,
            "avg_token_time_ms": avg_decode_time * 1000,
            "tokens_generated": len(decode_times),
            "tokens_per_second": len(decode_times) / total_decode_time if total_decode_time > 0 else 0,
            "time_breakdown": {
                "prefill_pct": prefill_time / total_time * 100 if total_time > 0 else 0,
                "decode_pct": total_decode_time / total_time * 100 if total_time > 0 else 0
            },
            "peak_memory_mb": max(s["total_memory_mb"] for s in self.memory_snapshots) if self.memory_snapshots else 0
        }

Memory Analysis

Analyze how memory scales with context length:

import torch

def analyze_memory_usage(model, context_lengths: list[int]):
    """Analyze how memory scales with context length."""
    results = []

    for ctx_len in context_lengths:
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()

        # Generate with this context length
        prompt = torch.randint(0, 32000, (1, ctx_len))
        model.generate(prompt, max_new_tokens=100)

        peak_memory = torch.cuda.max_memory_allocated() / 1e9

        results.append({
            "context_length": ctx_len,
            "peak_memory_gb": peak_memory
        })

    return results

# Example output: {.unnumbered}
# context_length | peak_memory_gb {.unnumbered}
# 512            | 15.2 {.unnumbered}
# 1024           | 16.1 {.unnumbered}
# 2048           | 17.8 {.unnumbered}
# 4096           | 21.3 {.unnumbered}
# 8192           | 28.4 {.unnumbered}

PyTorch GPU Profiler

Profile using PyTorch profiler with CUDA events:

import torch.profiler

def profile_with_pytorch(model, inputs, output_path="./profile"):
    """Profile using PyTorch profiler with CUDA events."""
    with torch.profiler.profile(
        activities=[
            torch.profiler.ProfilerActivity.CPU,
            torch.profiler.ProfilerActivity.CUDA,
        ],
        schedule=torch.profiler.schedule(wait=1, warmup=1, active=3, repeat=1),
        on_trace_ready=torch.profiler.tensorboard_trace_handler(output_path),
        record_shapes=True,
        profile_memory=True,
        with_stack=True
    ) as prof:
        for step in range(5):
            model.generate(inputs)
            prof.step()

    # Print summary
    print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=20))

Benchmark Inference

Comprehensive inference benchmark:

import time
import numpy as np
import torch

def benchmark_inference(model, test_prompts: list[str], config: dict):
    """Comprehensive inference benchmark."""
    results = {
        "config": config,
        "metrics": []
    }

    # Warmup
    for _ in range(3):
        model.generate(test_prompts[0], max_tokens=10)

    # Benchmark
    for prompt in test_prompts:
        torch.cuda.synchronize()
        start = time.perf_counter()

        tokens = model.generate(prompt, **config)
        first_token_time = model.get_first_token_time()  # If available

        torch.cuda.synchronize()
        end = time.perf_counter()

        results["metrics"].append({
            "prompt_tokens": len(model.tokenize(prompt)),
            "generated_tokens": len(tokens),
            "total_time_s": end - start,
            "ttft_ms": first_token_time * 1000 if first_token_time else None,
            "tokens_per_second": len(tokens) / (end - start)
        })

    # Aggregate
    results["summary"] = {
        "avg_tokens_per_second": np.mean([m["tokens_per_second"] for m in results["metrics"]]),
        "p50_total_time_s": np.percentile([m["total_time_s"] for m in results["metrics"]], 50),
        "p99_total_time_s": np.percentile([m["total_time_s"] for m in results["metrics"]], 99),
    }

    return results

Memory Optimization

KV Cache Manager

Complete implementation for paged KV cache management:

import torch

class KVCacheManager:
    """Manage KV cache with paging for efficiency."""

    def __init__(
        self,
        num_layers: int,
        num_heads: int,
        head_dim: int,
        block_size: int = 16,
        dtype=torch.float16
    ):
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.head_dim = head_dim
        self.block_size = block_size
        self.dtype = dtype

        # Pre-allocate block pool
        self.max_blocks = 1000
        self.block_pool = torch.zeros(
            self.max_blocks,
            2,  # K and V
            num_layers,
            block_size,
            num_heads,
            head_dim,
            dtype=dtype,
            device="cuda"
        )
        self.free_blocks = set(range(self.max_blocks))
        self.allocated = {}  # request_id -> list of block indices

    def allocate(self, request_id: str, num_tokens: int) -> list[int]:
        """Allocate blocks for a request."""
        num_blocks = (num_tokens + self.block_size - 1) // self.block_size

        if len(self.free_blocks) < num_blocks:
            raise MemoryError("Not enough KV cache blocks")

        blocks = []
        for _ in range(num_blocks):
            block_id = self.free_blocks.pop()
            blocks.append(block_id)

        self.allocated[request_id] = blocks
        return blocks

    def free(self, request_id: str):
        """Free blocks for a completed request."""
        if request_id in self.allocated:
            for block_id in self.allocated[request_id]:
                self.free_blocks.add(block_id)
            del self.allocated[request_id]

    def get_utilization(self) -> float:
        """Get KV cache memory utilization."""
        used = self.max_blocks - len(self.free_blocks)
        return used / self.max_blocks

Speculative Decoder

Complete implementation for speculative decoding:

EOS_TOKEN = 2  # End-of-sequence token ID

class SpeculativeDecoder:
    """Speculative decoding for faster generation."""

    def __init__(self, target_model, draft_model, n_draft: int = 4):
        self.target = target_model
        self.draft = draft_model
        self.n_draft = n_draft

    async def generate(self, prompt_tokens: list[int], max_tokens: int) -> list[int]:
        """Generate with speculative decoding."""
        generated = []
        kv_cache_target = None
        kv_cache_draft = None

        while len(generated) < max_tokens:
            # Step 1: Draft model generates n candidates
            draft_tokens = []
            for _ in range(self.n_draft):
                context = prompt_tokens + generated + draft_tokens
                next_token, kv_cache_draft = self.draft.decode_step(context, kv_cache_draft)
                draft_tokens.append(next_token)

                if next_token == EOS_TOKEN:
                    break

            # Step 2: Target model verifies all in parallel
            context = prompt_tokens + generated
            logits, kv_cache_target = self.target.forward_batch(
                context + draft_tokens,
                kv_cache_target
            )

            # Step 3: Accept matching tokens
            accepted = 0
            for i, draft_token in enumerate(draft_tokens):
                target_token = logits[len(context) + i].argmax()
                if draft_token == target_token:
                    accepted += 1
                    generated.append(draft_token)
                else:
                    # Reject: use target's token instead
                    generated.append(target_token)
                    break

            # Check for EOS
            if generated[-1] == EOS_TOKEN:
                break

            # If we rejected early, roll back draft model's KV cache
            if accepted < len(draft_tokens):
                kv_cache_draft = self._truncate_cache(kv_cache_draft, len(context) + accepted)

        return generated

    def _truncate_cache(self, cache, length):
        """Truncate KV cache to given length."""
        return cache[:, :, :length, :]

Prefix Cache

Complete implementation for KV cache prefix sharing:

import hashlib
import time
import torch

class PrefixCache:
    """Cache KV states for common prompt prefixes."""

    def __init__(self, max_entries: int = 100):
        self.cache = {}  # hash -> (prefix_tokens, kv_cache)
        self.max_entries = max_entries
        self.access_times = {}

    def get(self, prompt_tokens: list[int]) -> tuple[int, torch.Tensor]:
        """Find longest cached prefix."""
        best_match_len = 0
        best_cache = None

        for hash_key, (prefix_tokens, kv_cache) in self.cache.items():
            if self._is_prefix(prefix_tokens, prompt_tokens):
                if len(prefix_tokens) > best_match_len:
                    best_match_len = len(prefix_tokens)
                    best_cache = kv_cache
                    self.access_times[hash_key] = time.time()

        return best_match_len, best_cache

    def put(self, prefix_tokens: list[int], kv_cache: torch.Tensor):
        """Store a prefix's KV cache."""
        # Evict LRU if full
        if len(self.cache) >= self.max_entries:
            lru_key = min(self.access_times, key=self.access_times.get)
            del self.cache[lru_key]
            del self.access_times[lru_key]

        hash_key = self._hash(prefix_tokens)
        self.cache[hash_key] = (prefix_tokens, kv_cache.clone())
        self.access_times[hash_key] = time.time()

    def _is_prefix(self, prefix: list[int], full: list[int]) -> bool:
        return len(prefix) <= len(full) and prefix == full[:len(prefix)]

    def _hash(self, tokens: list[int]) -> str:
        return hashlib.md5(str(tokens).encode()).hexdigest()


EOS_TOKEN = 2  # End-of-sequence token ID

class CachedInference:
    """Inference with prefix caching."""

    def __init__(self, model, prefix_cache: PrefixCache):
        self.model = model
        self.prefix_cache = prefix_cache

    async def generate(self, prompt_tokens: list[int], max_tokens: int) -> list[int]:
        # Check for cached prefix
        match_len, cached_kv = self.prefix_cache.get(prompt_tokens)

        if cached_kv is not None:
            # Use cached KV, only process new tokens
            new_tokens = prompt_tokens[match_len:]
            kv_cache = cached_kv
            # Prefill only the new tokens
            kv_cache = self.model.prefill(new_tokens, kv_cache)
        else:
            # Full prefill
            kv_cache = self.model.prefill(prompt_tokens)

        # Decode
        generated = []
        for _ in range(max_tokens):
            token, kv_cache = self.model.decode_step(kv_cache)
            generated.append(token)
            if token == EOS_TOKEN:
                break

        # Consider caching this prefix for future
        if len(prompt_tokens) > 100:  # Only cache substantial prefixes
            self.prefix_cache.put(prompt_tokens, kv_cache)

        return generated

Scaling and Load Balancing

LLM Load Balancer

Complete implementation for load balancing across inference servers:

class LLMLoadBalancer:
    """Load balance across multiple inference servers."""

    def __init__(self, endpoints: list[str]):
        self.endpoints = endpoints
        self.request_counts = {ep: 0 for ep in endpoints}
        self.health_status = {ep: True for ep in endpoints}

    async def route_request(self, request: dict) -> dict:
        """Route request to least-loaded healthy endpoint."""
        healthy = [ep for ep in self.endpoints if self.health_status[ep]]
        if not healthy:
            raise Exception("No healthy endpoints")

        # Least-connections routing
        endpoint = min(healthy, key=lambda ep: self.request_counts[ep])

        self.request_counts[endpoint] += 1
        try:
            result = await self._send_request(endpoint, request)
            return result
        finally:
            self.request_counts[endpoint] -= 1

    async def health_check(self):
        """Periodically check endpoint health."""
        for endpoint in self.endpoints:
            try:
                await self._ping(endpoint)
                self.health_status[endpoint] = True
            except Exception:
                self.health_status[endpoint] = False

Kubernetes Deployment

Complete Kubernetes manifests for LLM inference:

# deployment.yaml {.unnumbered}
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-inference
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-inference
  template:
    metadata:
      labels:
        app: llm-inference
    spec:
      containers:
      - name: vllm
        image: vllm/vllm-openai:latest
        args:
          - --model=meta-llama/Llama-2-7b-hf
          - --gpu-memory-utilization=0.9
        resources:
          limits:
            nvidia.com/gpu: 1
        ports:
        - containerPort: 8000
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 120  # Model loading time
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: llm-inference
spec:
  selector:
    app: llm-inference
  ports:
  - port: 8000
    targetPort: 8000
  type: LoadBalancer
---
# Kubernetes HPA for GPU-based autoscaling {.unnumbered}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-inference-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-inference
  minReplicas: 1
  maxReplicas: 10
  metrics:
  - type: External
    external:
      metric:
        name: queue_depth
      target:
        type: AverageValue
        averageValue: "5"  # Scale up when queue > 5

Production Monitoring

Docker Compose Stack

Complete Docker Compose for inference with monitoring:

# docker-compose.yml {.unnumbered}
version: '3.8'

services:
  vllm:
    image: vllm/vllm-openai:latest
    command: >
      --model meta-llama/Llama-2-7b-hf
      --gpu-memory-utilization 0.9
      --max-model-len 4096
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    ports:
      - "8000:8000"

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"

Prometheus Metrics Exporter

Complete implementation for custom LLM metrics:

import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Define metrics {.unnumbered}
request_counter = Counter('llm_requests_total', 'Total LLM requests', ['model', 'status'])
latency_histogram = Histogram('llm_latency_seconds', 'Request latency', ['model', 'phase'])
tokens_generated = Counter('llm_tokens_generated_total', 'Total tokens generated', ['model'])
active_requests = Gauge('llm_active_requests', 'Currently processing requests', ['model'])
gpu_memory_used = Gauge('llm_gpu_memory_bytes', 'GPU memory used', ['gpu_id'])

class MetricsMiddleware:
    def __init__(self, model_name: str):
        self.model_name = model_name

    async def __call__(self, request, call_next):
        active_requests.labels(model=self.model_name).inc()
        start_time = time.time()

        try:
            response = await call_next(request)
            request_counter.labels(model=self.model_name, status="success").inc()
            return response
        except Exception as e:
            request_counter.labels(model=self.model_name, status="error").inc()
            raise
        finally:
            active_requests.labels(model=self.model_name).dec()
            latency_histogram.labels(model=self.model_name, phase="total").observe(time.time() - start_time)

Load Shedder

Complete implementation for overload protection:

import random
import time

class LoadShedder:
    """Implement load shedding for overload protection."""

    def __init__(
        self,
        max_queue_depth: int = 100,
        max_latency_ms: float = 5000,
        shed_probability_base: float = 0.1
    ):
        self.max_queue_depth = max_queue_depth
        self.max_latency_ms = max_latency_ms
        self.shed_probability_base = shed_probability_base
        self.current_queue_depth = 0
        self.recent_latencies = []

    def should_accept(self, request: dict) -> tuple[bool, str]:
        """Decide whether to accept or shed a request."""
        # Calculate load factor
        queue_load = self.current_queue_depth / self.max_queue_depth
        latency_load = self._get_avg_latency() / self.max_latency_ms

        load_factor = max(queue_load, latency_load)

        if load_factor < 0.7:
            return True, "accepted"

        if load_factor > 1.0:
            return False, "queue_full"

        # Probabilistic shedding in the 0.7-1.0 range
        shed_probability = self.shed_probability_base * (load_factor - 0.7) / 0.3
        if random.random() < shed_probability:
            return False, "shed_probabilistic"

        return True, "accepted_under_load"

    def _get_avg_latency(self) -> float:
        if not self.recent_latencies:
            return 0
        return sum(self.recent_latencies[-100:]) / min(len(self.recent_latencies), 100)

    async def process_request(self, request: dict, handler: callable) -> dict:
        """Process request with load shedding."""
        accept, reason = self.should_accept(request)

        if not accept:
            return {
                "error": "Service overloaded",
                "reason": reason,
                "retry_after_seconds": 5
            }

        self.current_queue_depth += 1
        start_time = time.time()

        try:
            result = await handler(request)
            latency = (time.time() - start_time) * 1000
            self.recent_latencies.append(latency)
            return result
        finally:
            self.current_queue_depth -= 1

Graceful Server

Complete implementation for graceful shutdown handling:

import signal
import asyncio

class GracefulServer:
    """Server with graceful shutdown handling."""

    def __init__(self, model, host: str = "0.0.0.0", port: int = 8000):
        self.model = model
        self.host = host
        self.port = port
        self.active_requests = set()
        self.shutdown_event = asyncio.Event()
        self.accepting_requests = True

    async def start(self):
        """Start server with signal handling."""
        # Set up signal handlers
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda: asyncio.create_task(self.shutdown())
            )

        # Start serving
        server = await asyncio.start_server(
            self.handle_connection,
            self.host,
            self.port
        )

        print(f"Server started on {self.host}:{self.port}")

        # Wait for shutdown
        await self.shutdown_event.wait()

        # Stop accepting new connections
        server.close()
        await server.wait_closed()

        # Wait for active requests to complete
        if self.active_requests:
            print(f"Waiting for {len(self.active_requests)} active requests...")
            await asyncio.gather(*self.active_requests, return_exceptions=True)

        print("Shutdown complete")

    async def shutdown(self, timeout: int = 30):
        """Initiate graceful shutdown."""
        print("Shutdown initiated...")
        self.accepting_requests = False

        # Give active requests time to complete
        try:
            async with asyncio.timeout(timeout):
                while self.active_requests:
                    await asyncio.sleep(0.1)
        except asyncio.TimeoutError:
            print(f"Timeout: {len(self.active_requests)} requests still active")

        self.shutdown_event.set()

    async def handle_connection(self, reader, writer):
        """Handle incoming connection."""
        if not self.accepting_requests:
            writer.write(b"HTTP/1.1 503 Service Unavailable\r\n\r\n")
            await writer.drain()
            writer.close()
            return

        request_task = asyncio.current_task()
        self.active_requests.add(request_task)

        try:
            await self.process_request(reader, writer)
        finally:
            self.active_requests.discard(request_task)

Load Test

Complete implementation for load testing inference endpoints:

import asyncio
import aiohttp
import time
import random
import numpy as np

async def load_test(
    endpoint: str,
    prompts: list[str],
    concurrency: int,
    duration_seconds: int
) -> dict:
    """Load test the inference endpoint."""
    results = {
        "total_requests": 0,
        "successful_requests": 0,
        "failed_requests": 0,
        "latencies_ms": []
    }

    start_time = time.time()

    async def worker():
        async with aiohttp.ClientSession() as session:
            while time.time() - start_time < duration_seconds:
                prompt = random.choice(prompts)
                request_start = time.time()

                try:
                    async with session.post(
                        f"{endpoint}/v1/completions",
                        json={"prompt": prompt, "max_tokens": 50}
                    ) as response:
                        results["total_requests"] += 1
                        if response.status == 200:
                            results["successful_requests"] += 1
                            latency = (time.time() - request_start) * 1000
                            results["latencies_ms"].append(latency)
                        else:
                            results["failed_requests"] += 1
                except Exception:
                    results["failed_requests"] += 1
                    results["total_requests"] += 1

    # Run concurrent workers
    await asyncio.gather(*[worker() for _ in range(concurrency)])

    # Calculate statistics
    if results["latencies_ms"]:
        results["p50_latency_ms"] = np.percentile(results["latencies_ms"], 50)
        results["p99_latency_ms"] = np.percentile(results["latencies_ms"], 99)
        results["throughput_rps"] = results["successful_requests"] / duration_seconds

    return results

# Run test {.unnumbered}
# results = asyncio.run(load_test( {.unnumbered}
#     endpoint="http://localhost:8000", {.unnumbered}
#     prompts=["Tell me a story about " for _ in range(100)], {.unnumbered}
#     concurrency=32, {.unnumbered}
#     duration_seconds=60 {.unnumbered}
# )) {.unnumbered}

Cost Engineering

Cost Estimator

Complete implementation for estimating and tracking inference costs:

class CostEstimator:
    """Estimate and track inference costs."""

    # GPU costs per hour (approximate, varies by provider)
    GPU_COSTS = {
        "A10": 1.50,
        "A100_40GB": 4.00,
        "A100_80GB": 6.00,
        "H100": 12.00,
        "RTX_4090": 0.80,  # Cloud gaming instances
    }

    def __init__(self, gpu_type: str, tokens_per_second: float):
        self.gpu_cost_per_hour = self.GPU_COSTS.get(gpu_type, 5.0)
        self.tokens_per_second = tokens_per_second

    def cost_per_token(self) -> float:
        """Calculate cost per generated token."""
        tokens_per_hour = self.tokens_per_second * 3600
        return self.gpu_cost_per_hour / tokens_per_hour

    def cost_per_request(self, input_tokens: int, output_tokens: int) -> float:
        """Estimate cost for a request."""
        # Prefill is faster than decode
        prefill_time = input_tokens / (self.tokens_per_second * 10)  # ~10x faster
        decode_time = output_tokens / self.tokens_per_second

        total_time_hours = (prefill_time + decode_time) / 3600
        return total_time_hours * self.gpu_cost_per_hour

    def break_even_vs_api(
        self,
        api_cost_per_1k_input: float,
        api_cost_per_1k_output: float,
        requests_per_month: int,
        avg_input_tokens: int,
        avg_output_tokens: int
    ) -> dict:
        """Calculate break-even for self-hosting vs API."""
        # API cost
        api_cost_monthly = requests_per_month * (
            (avg_input_tokens / 1000) * api_cost_per_1k_input +
            (avg_output_tokens / 1000) * api_cost_per_1k_output
        )

        # Self-hosted cost (assume 24/7 operation)
        self_hosted_cost_monthly = self.gpu_cost_per_hour * 24 * 30

        # Capacity: can we handle the load?
        seconds_per_month = 30 * 24 * 3600
        max_output_tokens_monthly = self.tokens_per_second * seconds_per_month
        max_requests_monthly = max_output_tokens_monthly / avg_output_tokens * 0.7  # 70% utilization

        return {
            "api_cost_monthly": api_cost_monthly,
            "self_hosted_cost_monthly": self_hosted_cost_monthly,
            "savings_monthly": api_cost_monthly - self_hosted_cost_monthly,
            "savings_pct": (api_cost_monthly - self_hosted_cost_monthly) / api_cost_monthly * 100,
            "max_requests_supported": max_requests_monthly,
            "can_handle_load": max_requests_monthly >= requests_per_month,
            "break_even_requests": self_hosted_cost_monthly / (
                (avg_input_tokens / 1000) * api_cost_per_1k_input +
                (avg_output_tokens / 1000) * api_cost_per_1k_output
            )
        }

Cost Optimization

Recommend cost-optimized deployment configuration:

def optimize_for_cost(
    model_size: str,
    expected_requests_per_hour: int,
    latency_sla_ms: float
) -> dict:
    """Recommend cost-optimized deployment configuration."""
    configurations = [
        {"gpu": "A10", "precision": "int4", "batch_size": 32},
        {"gpu": "A10", "precision": "fp16", "batch_size": 16},
        {"gpu": "A100_40GB", "precision": "int4", "batch_size": 64},
        {"gpu": "A100_40GB", "precision": "fp16", "batch_size": 32},
        {"gpu": "A100_80GB", "precision": "fp16", "batch_size": 64},
    ]

    viable = []
    for config in configurations:
        throughput = estimate_throughput(model_size, config)
        latency = estimate_latency(model_size, config)

        if latency <= latency_sla_ms and throughput >= expected_requests_per_hour:
            cost = CostEstimator.GPU_COSTS[config["gpu"]]
            cost_per_request = cost / throughput
            viable.append({
                **config,
                "throughput": throughput,
                "latency": latency,
                "cost_per_1k_requests": cost_per_request * 1000
            })

    # Sort by cost
    viable.sort(key=lambda x: x["cost_per_1k_requests"])
    return viable[0] if viable else None