Performance Engineering Code Reference

Complete code implementations for Chapter 61: Performance Engineering.


Table of Contents


Profiling and Diagnostics

Inference Profiler

Complete implementation for profiling PyTorch inference workloads:

import torch
from torch.profiler import profile, record_function, ProfilerActivity

class InferenceProfiler:
    """Profile PyTorch inference workloads."""

    def __init__(self, model, device='cuda'):
        self.model = model.to(device)
        self.device = device

    def profile_inference(
        self,
        input_batch,
        warmup_runs: int = 3,
        profile_runs: int = 5
    ) -> dict:
        """Profile inference with detailed breakdown."""
        # Warmup
        for _ in range(warmup_runs):
            with torch.no_grad():
                _ = self.model(input_batch)

        if self.device == 'cuda':
            torch.cuda.synchronize()

        # Profile
        with profile(
            activities=[
                ProfilerActivity.CPU,
                ProfilerActivity.CUDA,
            ],
            record_shapes=True,
            profile_memory=True,
            with_stack=True,
        ) as prof:
            for _ in range(profile_runs):
                with record_function("model_inference"):
                    with torch.no_grad():
                        output = self.model(input_batch)

                if self.device == 'cuda':
                    torch.cuda.synchronize()

        # Analyze results
        return self._analyze_profile(prof)

    def _analyze_profile(self, prof) -> dict:
        """Extract key metrics from profile."""
        # Get top operations by CUDA time
        events = prof.key_averages()

        cuda_time_total = sum(e.cuda_time_total for e in events)
        cpu_time_total = sum(e.cpu_time_total for e in events)

        top_cuda_ops = sorted(
            events,
            key=lambda e: e.cuda_time_total,
            reverse=True
        )[:10]

        top_memory_ops = sorted(
            events,
            key=lambda e: e.cuda_memory_usage if e.cuda_memory_usage else 0,
            reverse=True
        )[:10]

        return {
            'cuda_time_total_us': cuda_time_total,
            'cpu_time_total_us': cpu_time_total,
            'top_cuda_ops': [
                {
                    'name': op.key,
                    'cuda_time_us': op.cuda_time_total,
                    'percentage': op.cuda_time_total / cuda_time_total * 100
                }
                for op in top_cuda_ops
            ],
            'top_memory_ops': [
                {
                    'name': op.key,
                    'memory_bytes': op.cuda_memory_usage
                }
                for op in top_memory_ops if op.cuda_memory_usage
            ],
            'trace_path': None  # Can export to Chrome trace
        }

    def export_trace(self, prof, path: str):
        """Export trace for visualization."""
        prof.export_chrome_trace(path)

Bottleneck Diagnoser

Complete implementation for identifying performance bottlenecks:

from dataclasses import dataclass

@dataclass
class BottleneckPattern:
    """Common bottleneck pattern and diagnosis."""
    pattern: str
    symptoms: list[str]
    diagnosis_approach: str
    common_causes: list[str]
    solutions: list[str]

BOTTLENECK_PATTERNS = [
    BottleneckPattern(
        pattern="CPU-bound",
        symptoms=[
            "GPU utilization is low (<70%)",
            "CPU utilization is high",
            "Adding GPUs doesn't improve throughput"
        ],
        diagnosis_approach="Profile CPU with py-spy; check for Python overhead",
        common_causes=[
            "Data preprocessing on CPU",
            "Too much Python in hot path",
            "Tokenization/detokenization",
            "Network serialization"
        ],
        solutions=[
            "Move preprocessing to GPU",
            "Use async data loading",
            "Batch preprocessing",
            "Use compiled/native tokenizers"
        ]
    ),
    BottleneckPattern(
        pattern="Memory-bandwidth bound",
        symptoms=[
            "GPU compute utilization is low",
            "Memory throughput near theoretical max",
            "Adding more FLOPS doesn't help"
        ],
        diagnosis_approach="Check memory throughput in Nsight Compute",
        common_causes=[
            "Large model that doesn't fit in cache",
            "Inefficient memory access patterns",
            "Too many memory-bound operations"
        ],
        solutions=[
            "Quantization to reduce memory traffic",
            "Kernel fusion to reduce memory reads/writes",
            "Optimize memory layout",
            "Use Flash Attention for attention layers"
        ]
    ),
    BottleneckPattern(
        pattern="GPU kernel launch bound",
        symptoms=[
            "Many small kernels visible in trace",
            "High CPU time between GPU operations",
            "GPU idle between operations"
        ],
        diagnosis_approach="Look at timeline in Nsight Systems",
        common_causes=[
            "Too many small operations",
            "Eager execution overhead",
            "Lack of kernel fusion"
        ],
        solutions=[
            "Use torch.compile() or TensorRT",
            "CUDA Graphs",
            "Operator fusion",
            "Batch operations together"
        ]
    ),
    BottleneckPattern(
        pattern="Host-device transfer bound",
        symptoms=[
            "High time spent in cudaMemcpy",
            "GPU idle waiting for data",
            "PCIe bandwidth saturated"
        ],
        diagnosis_approach="Check transfer times in Nsight Systems",
        common_causes=[
            "Moving data CPU<->GPU frequently",
            "Not using pinned memory",
            "Synchronous transfers blocking execution"
        ],
        solutions=[
            "Keep data on GPU",
            "Use pinned memory for transfers",
            "Overlap computation with transfer",
            "Batch transfers"
        ]
    ),
]

class BottleneckDiagnoser:
    """Diagnose performance bottlenecks."""

    def __init__(self, profiler_results: dict):
        self.results = profiler_results

    def diagnose(self) -> list[dict]:
        """Identify likely bottlenecks."""
        bottlenecks = []

        # Check CPU vs GPU time ratio
        cpu_gpu_ratio = (
            self.results['cpu_time_total_us'] /
            max(self.results['cuda_time_total_us'], 1)
        )

        if cpu_gpu_ratio > 2.0:
            bottlenecks.append({
                'pattern': 'cpu_bound',
                'confidence': min(cpu_gpu_ratio / 5.0, 1.0),
                'evidence': f'CPU time is {cpu_gpu_ratio:.1f}x GPU time'
            })

        # Check for many small operations
        top_ops = self.results.get('top_cuda_ops', [])
        if len(top_ops) > 5:
            op_count = len(top_ops)
            largest_op_percentage = top_ops[0]['percentage'] if top_ops else 0

            if largest_op_percentage < 30:
                bottlenecks.append({
                    'pattern': 'kernel_launch_bound',
                    'confidence': 0.7,
                    'evidence': f'Largest op is only {largest_op_percentage:.1f}% of time'
                })

        return bottlenecks

GPU Optimization

Model Optimizer (torch.compile)

Complete implementation for optimizing models with torch.compile:

import torch
import time

class ModelOptimizer:
    """Optimize models with torch.compile."""

    def __init__(self, model):
        self.original_model = model
        self.compiled_model = None

    def compile(
        self,
        mode: str = 'default',
        fullgraph: bool = False,
        dynamic: bool = True
    ):
        """Compile the model for optimized execution."""
        # Modes: 'default', 'reduce-overhead', 'max-autotune'
        # - default: Good balance of compile time and speedup
        # - reduce-overhead: Minimize framework overhead (CUDA graphs)
        # - max-autotune: Try more options, slower compile

        self.compiled_model = torch.compile(
            self.original_model,
            mode=mode,
            fullgraph=fullgraph,  # Capture entire model as one graph
            dynamic=dynamic,  # Handle dynamic shapes
        )
        return self.compiled_model

    def benchmark_compilation_benefit(
        self,
        sample_input,
        warmup_runs: int = 10,
        benchmark_runs: int = 100
    ) -> dict:
        """Benchmark original vs compiled performance."""
        # Warmup and benchmark original
        for _ in range(warmup_runs):
            with torch.no_grad():
                _ = self.original_model(sample_input)
        torch.cuda.synchronize()

        original_times = []
        for _ in range(benchmark_runs):
            start = time.perf_counter()
            with torch.no_grad():
                _ = self.original_model(sample_input)
            torch.cuda.synchronize()
            original_times.append(time.perf_counter() - start)

        # Warmup and benchmark compiled
        if self.compiled_model is None:
            self.compile()

        for _ in range(warmup_runs):
            with torch.no_grad():
                _ = self.compiled_model(sample_input)
        torch.cuda.synchronize()

        compiled_times = []
        for _ in range(benchmark_runs):
            start = time.perf_counter()
            with torch.no_grad():
                _ = self.compiled_model(sample_input)
            torch.cuda.synchronize()
            compiled_times.append(time.perf_counter() - start)

        original_avg = sum(original_times) / len(original_times)
        compiled_avg = sum(compiled_times) / len(compiled_times)

        return {
            'original_avg_ms': original_avg * 1000,
            'compiled_avg_ms': compiled_avg * 1000,
            'speedup': original_avg / compiled_avg,
            'time_saved_per_inference_ms': (original_avg - compiled_avg) * 1000
        }

TensorRT Optimizer

Complete implementation for TensorRT model optimization:

from dataclasses import dataclass

@dataclass
class CompilerOption:
    """Model compilation options."""
    name: str
    description: str
    best_for: str
    tradeoffs: list[str]

COMPILER_OPTIONS = [
    CompilerOption(
        name="torch.compile",
        description="PyTorch's built-in compiler using TorchDynamo + Inductor",
        best_for="Quick optimization with minimal code changes",
        tradeoffs=[
            "First run is slow (compilation)",
            "Dynamic shapes can cause recompilation",
            "Some ops may fall back to eager"
        ]
    ),
    CompilerOption(
        name="TensorRT",
        description="NVIDIA's high-performance inference optimizer",
        best_for="Production NVIDIA GPU deployment",
        tradeoffs=[
            "Static shapes required (or multiple profiles)",
            "Longer compilation time",
            "NVIDIA-only",
            "Best speedups, especially for FP16/INT8"
        ]
    ),
    CompilerOption(
        name="ONNX Runtime",
        description="Cross-platform inference engine",
        best_for="Multi-platform deployment, CPU and GPU",
        tradeoffs=[
            "Conversion from PyTorch required",
            "Some ops may not be supported",
            "Good for standardization"
        ]
    ),
    CompilerOption(
        name="vLLM / TGI",
        description="Specialized LLM serving with built-in optimizations",
        best_for="LLM inference with batching",
        tradeoffs=[
            "LLM-specific, not general purpose",
            "Includes continuous batching, PagedAttention",
            "May require model-specific support"
        ]
    ),
]

class TensorRTOptimizer:
    """Optimize models with TensorRT."""

    def __init__(self, model_path: str):
        self.model_path = model_path

    def optimize(
        self,
        precision: str = 'fp16',  # fp32, fp16, int8
        batch_sizes: list[int] = [1, 8, 32],
        max_sequence_length: int = 2048,
        workspace_gb: float = 4.0
    ) -> dict:
        """Convert model to TensorRT."""
        # This is a conceptual example
        # Real implementation uses tensorrt library

        config = {
            'precision': precision,
            'optimization_profiles': [],
            'workspace_gb': workspace_gb
        }

        # Create optimization profiles for different shapes
        for batch_size in batch_sizes:
            profile = {
                'min_shape': (1, 1),
                'opt_shape': (batch_size, max_sequence_length // 2),
                'max_shape': (batch_size, max_sequence_length)
            }
            config['optimization_profiles'].append(profile)

        # Build engine (conceptual)
        # engine = self._build_engine(config)

        return {
            'config': config,
            'expected_speedup': self._estimate_speedup(precision),
            'memory_reduction': self._estimate_memory_reduction(precision)
        }

    def _estimate_speedup(self, precision: str) -> str:
        estimates = {
            'fp32': '1.5-2x',
            'fp16': '2-4x',
            'int8': '3-5x'
        }
        return estimates.get(precision, 'Unknown')

    def _estimate_memory_reduction(self, precision: str) -> str:
        estimates = {
            'fp32': '20-30%',
            'fp16': '50%',
            'int8': '75%'
        }
        return estimates.get(precision, 'Unknown')

CUDA Graph Optimizer

Complete implementation for CUDA Graph optimization:

import torch
import time

class CUDAGraphOptimizer:
    """Optimize inference with CUDA Graphs."""

    def __init__(self, model, sample_input):
        self.model = model
        self.sample_input = sample_input
        self.graph = None
        self.static_input = None
        self.static_output = None

    def capture_graph(self):
        """Capture CUDA graph for the model."""
        # Warmup
        self.model.eval()
        with torch.no_grad():
            for _ in range(3):
                _ = self.model(self.sample_input)

        torch.cuda.synchronize()

        # Allocate static tensors
        self.static_input = self.sample_input.clone()
        self.static_output = self.model(self.static_input)

        # Capture graph
        self.graph = torch.cuda.CUDAGraph()

        with torch.cuda.graph(self.graph):
            self.static_output = self.model(self.static_input)

    def inference(self, input_tensor):
        """Run inference using captured graph."""
        if self.graph is None:
            raise RuntimeError("Graph not captured. Call capture_graph() first.")

        # Copy input to static buffer
        self.static_input.copy_(input_tensor)

        # Replay graph
        self.graph.replay()

        # Return copy of output
        return self.static_output.clone()

    def benchmark_benefit(
        self,
        num_iterations: int = 1000
    ) -> dict:
        """Benchmark CUDA Graph benefit."""
        # Benchmark without graph
        torch.cuda.synchronize()
        start = time.perf_counter()
        for _ in range(num_iterations):
            _ = self.model(self.sample_input)
        torch.cuda.synchronize()
        baseline_time = time.perf_counter() - start

        # Ensure graph is captured
        if self.graph is None:
            self.capture_graph()

        # Benchmark with graph
        torch.cuda.synchronize()
        start = time.perf_counter()
        for _ in range(num_iterations):
            self.graph.replay()
        torch.cuda.synchronize()
        graph_time = time.perf_counter() - start

        return {
            'baseline_ms': baseline_time / num_iterations * 1000,
            'graph_ms': graph_time / num_iterations * 1000,
            'speedup': baseline_time / graph_time,
            'note': 'Speedup is higher for small models with many kernels'
        }

Tensor Parallel Inference

Complete implementation for tensor parallel model splitting:

from dataclasses import dataclass
from enum import Enum
import torch.nn as nn

class ParallelismType(Enum):
    TENSOR = "tensor"        # Split tensors across GPUs
    PIPELINE = "pipeline"    # Split layers across GPUs
    DATA = "data"           # Replicate model, split data
    EXPERT = "expert"       # MoE expert parallelism

@dataclass
class ParallelismStrategy:
    """Model parallelism strategy."""
    type: ParallelismType
    description: str
    best_for: str
    implementation_complexity: str
    efficiency: str

PARALLELISM_STRATEGIES = [
    ParallelismStrategy(
        type=ParallelismType.DATA,
        description="Each GPU has full model copy, processes different data",
        best_for="Training, batch inference when model fits on one GPU",
        implementation_complexity="Low",
        efficiency="High for training; limited for latency-sensitive inference"
    ),
    ParallelismStrategy(
        type=ParallelismType.TENSOR,
        description="Split weight matrices across GPUs, all process same input",
        best_for="Large models for low-latency inference",
        implementation_complexity="Medium-High",
        efficiency="High with fast interconnect (NVLink); reduces latency"
    ),
    ParallelismStrategy(
        type=ParallelismType.PIPELINE,
        description="Different layers on different GPUs, data flows through",
        best_for="Very deep models, training with micro-batching",
        implementation_complexity="Medium",
        efficiency="Requires careful scheduling to avoid bubbles"
    ),
    ParallelismStrategy(
        type=ParallelismType.EXPERT,
        description="Different experts on different GPUs for MoE models",
        best_for="Mixture of Experts models",
        implementation_complexity="High",
        efficiency="Depends on routing efficiency"
    ),
]

class TensorParallelInference:
    """Tensor parallel inference setup."""

    def __init__(self, model, num_gpus: int):
        self.model = model
        self.num_gpus = num_gpus

    def analyze_split_strategy(self) -> dict:
        """Analyze how to split the model."""
        analysis = {
            'total_params': sum(p.numel() for p in self.model.parameters()),
            'layers': [],
            'recommended_split': []
        }

        # Analyze each layer
        for name, module in self.model.named_modules():
            if hasattr(module, 'weight'):
                weight_shape = module.weight.shape
                can_split = self._can_split_layer(module)

                analysis['layers'].append({
                    'name': name,
                    'shape': weight_shape,
                    'can_split': can_split,
                    'split_dim': self._get_split_dim(module) if can_split else None
                })

        return analysis

    def _can_split_layer(self, module) -> bool:
        """Check if layer can be tensor-parallel split."""
        return isinstance(module, (nn.Linear, nn.Embedding))

    def _get_split_dim(self, module) -> int:
        """Get dimension to split on."""
        if isinstance(module, nn.Linear):
            # Split output dimension for column parallel
            # Split input dimension for row parallel
            return 0  # Simplified
        return 0

    def estimate_communication_overhead(self) -> dict:
        """Estimate inter-GPU communication overhead."""
        # Tensor parallelism requires all-reduce after each layer
        # Communication volume per layer = 2 * batch * seq * hidden * (n-1)/n

        # For a 70B model with 8-way TP:
        # ~80 layers, each needs all-reduce
        # With NVLink: ~100us per all-reduce
        # Total overhead: ~8ms per forward pass

        return {
            'all_reduces_per_forward': 80,  # Approximate for 70B
            'estimated_overhead_ms': 8,
            'recommendation': 'Use NVLink for tensor parallelism; PCIe is too slow'
        }

Memory Optimization

Optimized KV Cache

Complete implementation for memory-efficient KV cache:

from dataclasses import dataclass
import torch

@dataclass
class KVCacheConfig:
    """KV cache configuration."""
    num_layers: int
    num_heads: int
    head_dim: int
    max_seq_length: int
    dtype: torch.dtype

class OptimizedKVCache:
    """Memory-efficient KV cache implementation."""

    def __init__(self, config: KVCacheConfig, batch_size: int):
        self.config = config
        self.batch_size = batch_size

        # Pre-allocate cache
        cache_shape = (
            config.num_layers,
            2,  # K and V
            batch_size,
            config.num_heads,
            config.max_seq_length,
            config.head_dim
        )

        # Use contiguous memory for efficient access
        self.cache = torch.zeros(
            cache_shape,
            dtype=config.dtype,
            device='cuda'
        )

        self.seq_lengths = torch.zeros(batch_size, dtype=torch.long, device='cuda')

    def update(
        self,
        layer_idx: int,
        keys: torch.Tensor,
        values: torch.Tensor,
        positions: torch.Tensor
    ):
        """Update cache at specific positions."""
        # keys, values: [batch, heads, seq_len, head_dim]
        batch_indices = torch.arange(self.batch_size, device='cuda')

        for seq_pos in range(keys.shape[2]):
            cache_pos = positions[:, seq_pos]  # [batch]
            self.cache[layer_idx, 0, batch_indices, :, cache_pos, :] = keys[:, :, seq_pos, :]
            self.cache[layer_idx, 1, batch_indices, :, cache_pos, :] = values[:, :, seq_pos, :]

        # Update sequence lengths
        self.seq_lengths = torch.maximum(
            self.seq_lengths,
            positions[:, -1] + 1
        )

    def get(self, layer_idx: int, batch_idx: int, up_to_pos: int):
        """Get cached K,V up to position."""
        k = self.cache[layer_idx, 0, batch_idx, :, :up_to_pos, :]
        v = self.cache[layer_idx, 1, batch_idx, :, :up_to_pos, :]
        return k, v

    def memory_usage_bytes(self) -> int:
        """Calculate memory usage."""
        element_size = self.cache.element_size()
        return self.cache.numel() * element_size

Paged KV Cache

Complete implementation for paged attention KV cache (vLLM-style):

import torch

class PagedKVCache:
    """Paged attention KV cache (vLLM-style)."""

    def __init__(
        self,
        num_layers: int,
        num_heads: int,
        head_dim: int,
        block_size: int,  # Tokens per block
        num_blocks: int,  # Total blocks in pool
        dtype: torch.dtype
    ):
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.head_dim = head_dim
        self.block_size = block_size
        self.num_blocks = num_blocks
        self.dtype = dtype

        # Block pool: [num_blocks, num_layers, 2, block_size, num_heads, head_dim]
        self.block_pool = torch.zeros(
            (num_blocks, num_layers, 2, block_size, num_heads, head_dim),
            dtype=dtype,
            device='cuda'
        )

        # Block allocation tracking
        self.free_blocks = list(range(num_blocks))
        self.sequence_blocks = {}  # seq_id -> list of block indices

    def allocate_sequence(self, seq_id: int, num_tokens: int) -> list[int]:
        """Allocate blocks for a sequence."""
        num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size

        if len(self.free_blocks) < num_blocks_needed:
            raise MemoryError("Insufficient KV cache blocks")

        allocated = []
        for _ in range(num_blocks_needed):
            block_idx = self.free_blocks.pop(0)
            allocated.append(block_idx)

        self.sequence_blocks[seq_id] = allocated
        return allocated

    def free_sequence(self, seq_id: int):
        """Free blocks for a completed sequence."""
        if seq_id in self.sequence_blocks:
            blocks = self.sequence_blocks.pop(seq_id)
            self.free_blocks.extend(blocks)

    def utilization(self) -> float:
        """Calculate cache utilization."""
        allocated = self.num_blocks - len(self.free_blocks)
        return allocated / self.num_blocks

Quantization Analyzer

Complete implementation for analyzing quantization tradeoffs:

from dataclasses import dataclass
from enum import Enum

class QuantizationType(Enum):
    INT8 = "int8"
    INT4 = "int4"
    FP8 = "fp8"
    NF4 = "nf4"  # 4-bit NormalFloat
    GPTQ = "gptq"
    AWQ = "awq"

@dataclass
class QuantizationConfig:
    """Quantization configuration."""
    method: QuantizationType
    bits: int
    group_size: int  # Number of weights sharing a scale factor
    desc: str

QUANTIZATION_METHODS = {
    'int8_static': QuantizationConfig(
        method=QuantizationType.INT8,
        bits=8,
        group_size=-1,  # Per-tensor
        desc="Static INT8 quantization; calibrated on representative data"
    ),
    'int8_dynamic': QuantizationConfig(
        method=QuantizationType.INT8,
        bits=8,
        group_size=-1,
        desc="Dynamic INT8; scale computed at runtime"
    ),
    'int4_gptq': QuantizationConfig(
        method=QuantizationType.GPTQ,
        bits=4,
        group_size=128,
        desc="GPTQ 4-bit; good quality with careful calibration"
    ),
    'int4_awq': QuantizationConfig(
        method=QuantizationType.AWQ,
        bits=4,
        group_size=128,
        desc="AWQ 4-bit; activation-aware, often better than GPTQ"
    ),
    'nf4_qlora': QuantizationConfig(
        method=QuantizationType.NF4,
        bits=4,
        group_size=64,
        desc="NF4 for QLoRA fine-tuning; information-theoretically optimal"
    ),
}

class QuantizationAnalyzer:
    """Analyze quantization tradeoffs."""

    def __init__(self, model_size_params: int, dtype_bytes: int = 2):
        self.model_size = model_size_params
        self.original_bytes = model_size_params * dtype_bytes

    def analyze_config(self, config: QuantizationConfig) -> dict:
        """Analyze a quantization configuration."""
        # Calculate memory savings
        bits_per_param = config.bits
        # Add scale/zero-point overhead
        if config.group_size > 0:
            scale_overhead = 32 / config.group_size  # FP16 scale per group
            effective_bits = bits_per_param + scale_overhead
        else:
            effective_bits = bits_per_param + 32 / self.model_size  # Negligible

        quantized_bytes = self.model_size * effective_bits / 8
        memory_reduction = 1 - (quantized_bytes / self.original_bytes)

        # Estimate speedup (memory-bound case)
        # Speedup roughly proportional to memory reduction
        estimated_speedup = self.original_bytes / quantized_bytes

        return {
            'config': config.desc,
            'bits_per_weight': effective_bits,
            'original_gb': self.original_bytes / 1e9,
            'quantized_gb': quantized_bytes / 1e9,
            'memory_reduction': f'{memory_reduction:.1%}',
            'estimated_speedup': f'{estimated_speedup:.1f}x',
            'quality_notes': self._quality_notes(config)
        }

    def _quality_notes(self, config: QuantizationConfig) -> str:
        if config.bits >= 8:
            return "Minimal quality loss; suitable for most use cases"
        elif config.bits >= 4:
            return "Some quality loss; test carefully on your use case"
        else:
            return "Significant quality loss; may not be suitable for complex tasks"

Gradient Checkpoint Analyzer

Complete implementation for gradient checkpointing analysis:

import torch.utils.checkpoint as checkpoint

class GradientCheckpointAnalyzer:
    """Analyze gradient checkpointing tradeoffs."""

    def __init__(self, model, batch_size: int, seq_length: int):
        self.model = model
        self.batch_size = batch_size
        self.seq_length = seq_length

    def analyze_memory_savings(self, checkpoint_every_n_layers: int) -> dict:
        """Analyze memory savings from checkpointing."""
        num_layers = self._count_layers()

        # Without checkpointing: store all activations
        activation_memory_per_layer = self._estimate_activation_memory()
        baseline_memory = num_layers * activation_memory_per_layer

        # With checkpointing: only store checkpoint activations
        num_checkpoints = num_layers // checkpoint_every_n_layers
        checkpointed_memory = num_checkpoints * activation_memory_per_layer

        # Recompute cost
        recompute_layers = num_layers - num_checkpoints
        extra_forward_passes = recompute_layers

        return {
            'baseline_activation_gb': baseline_memory / 1e9,
            'checkpointed_activation_gb': checkpointed_memory / 1e9,
            'memory_savings': 1 - (checkpointed_memory / baseline_memory),
            'extra_compute_percentage': extra_forward_passes / num_layers * 100,
            'recommendation': self._recommend(checkpoint_every_n_layers)
        }

    def _count_layers(self) -> int:
        count = 0
        for name, _ in self.model.named_modules():
            if 'layer' in name.lower():
                count += 1
        return max(count, 1)

    def _estimate_activation_memory(self) -> float:
        # Simplified: activation per layer = batch * seq * hidden * 4 bytes * factor
        hidden_size = 4096  # Assume
        bytes_per_element = 4  # FP32
        factor = 10  # Multiple tensors per layer

        return self.batch_size * self.seq_length * hidden_size * bytes_per_element * factor

    def _recommend(self, n: int) -> str:
        if n == 1:
            return "Maximum memory savings, ~33% training slowdown"
        elif n <= 4:
            return "Good balance of memory and compute"
        else:
            return "Minimal memory savings, consider reducing"

def enable_gradient_checkpointing(model):
    """Enable gradient checkpointing on a model."""
    # For transformer models, checkpoint each layer
    if hasattr(model, 'gradient_checkpointing_enable'):
        model.gradient_checkpointing_enable()
    else:
        # Manual implementation for custom models
        for layer in model.layers:
            layer.forward = checkpoint.checkpoint(layer.forward)

Attention Optimization

Attention Profiler

Complete implementation for comparing attention implementations:

import torch
import torch.nn.functional as F
import time

class AttentionProfiler:
    """Profile attention performance."""

    def compare_implementations(
        self,
        batch_size: int,
        seq_len: int,
        num_heads: int,
        head_dim: int
    ) -> dict:
        """Compare attention implementations."""
        q = torch.randn(batch_size, num_heads, seq_len, head_dim, device='cuda')
        k = torch.randn(batch_size, num_heads, seq_len, head_dim, device='cuda')
        v = torch.randn(batch_size, num_heads, seq_len, head_dim, device='cuda')

        results = {}

        # Standard attention (if memory allows)
        try:
            torch.cuda.empty_cache()
            start_mem = torch.cuda.memory_allocated()

            torch.cuda.synchronize()
            start = time.perf_counter()

            # Standard implementation
            attn_weights = torch.matmul(q, k.transpose(-2, -1)) / (head_dim ** 0.5)
            attn_weights = F.softmax(attn_weights, dim=-1)
            output = torch.matmul(attn_weights, v)

            torch.cuda.synchronize()
            elapsed = time.perf_counter() - start

            peak_mem = torch.cuda.max_memory_allocated() - start_mem

            results['standard'] = {
                'time_ms': elapsed * 1000,
                'memory_mb': peak_mem / 1e6
            }
        except RuntimeError:
            results['standard'] = {'error': 'OOM'}

        # Flash Attention (SDPA)
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()
        start_mem = torch.cuda.memory_allocated()

        torch.cuda.synchronize()
        start = time.perf_counter()

        output = F.scaled_dot_product_attention(q, k, v)

        torch.cuda.synchronize()
        elapsed = time.perf_counter() - start

        peak_mem = torch.cuda.max_memory_allocated() - start_mem

        results['flash'] = {
            'time_ms': elapsed * 1000,
            'memory_mb': peak_mem / 1e6
        }

        return results

Speculative Decoder

Complete implementation for speculative decoding:

import torch

class SpeculativeDecoder:
    """Implement speculative decoding for faster generation."""

    def __init__(
        self,
        target_model,  # Large, accurate model
        draft_model,   # Small, fast model
        num_speculative_tokens: int = 4
    ):
        self.target = target_model
        self.draft = draft_model
        self.gamma = num_speculative_tokens

    def generate(
        self,
        input_ids: torch.Tensor,
        max_new_tokens: int
    ) -> torch.Tensor:
        """Generate with speculative decoding."""
        generated = input_ids.clone()

        while generated.shape[1] - input_ids.shape[1] < max_new_tokens:
            # Draft model generates gamma tokens quickly
            draft_tokens = self._draft_generate(generated, self.gamma)

            # Target model verifies all at once
            verified, num_accepted = self._verify(generated, draft_tokens)

            # Append verified tokens
            generated = torch.cat([generated, verified], dim=1)

            # If we rejected all draft tokens, just sample one from target
            if num_accepted == 0:
                next_token = self._sample_from_target(generated)
                generated = torch.cat([generated, next_token], dim=1)

        return generated[:, :input_ids.shape[1] + max_new_tokens]

    def _draft_generate(
        self,
        context: torch.Tensor,
        num_tokens: int
    ) -> torch.Tensor:
        """Generate draft tokens with small model."""
        draft_tokens = []
        current = context

        for _ in range(num_tokens):
            logits = self.draft(current)[:, -1, :]
            next_token = torch.argmax(logits, dim=-1, keepdim=True)
            draft_tokens.append(next_token)
            current = torch.cat([current, next_token], dim=1)

        return torch.cat(draft_tokens, dim=1)

    def _verify(
        self,
        context: torch.Tensor,
        draft_tokens: torch.Tensor
    ) -> tuple[torch.Tensor, int]:
        """Verify draft tokens with target model."""
        # Get target model probabilities for all positions
        full_seq = torch.cat([context, draft_tokens], dim=1)
        target_logits = self.target(full_seq)

        # For each draft token, check if target would have sampled it
        verified = []
        for i in range(draft_tokens.shape[1]):
            pos = context.shape[1] + i - 1
            target_probs = torch.softmax(target_logits[:, pos, :], dim=-1)
            draft_token = draft_tokens[:, i]

            # Accept if target probability is high enough
            # (Simplified - real implementation uses rejection sampling)
            if target_probs[0, draft_token].item() > 0.1:
                verified.append(draft_tokens[:, i:i+1])
            else:
                break

        if verified:
            return torch.cat(verified, dim=1), len(verified)
        else:
            return torch.tensor([[]]), 0

    def _sample_from_target(self, context: torch.Tensor) -> torch.Tensor:
        """Sample a single token from the target model."""
        logits = self.target(context)[:, -1, :]
        return torch.argmax(logits, dim=-1, keepdim=True)

    def expected_speedup(self) -> dict:
        """Estimate expected speedup."""
        # Simplified model
        # Assume draft model is 5x faster
        # Accept rate of ~70%

        draft_cost = 1 / 5  # Relative to target
        target_verify_cost = self.gamma / 1  # Verify gamma tokens at once

        cost_per_accepted = (
            (self.gamma * draft_cost + target_verify_cost) /
            (self.gamma * 0.7)  # 70% accept rate
        )

        baseline_cost = 1  # One target model call per token

        return {
            'estimated_speedup': baseline_cost / cost_per_accepted,
            'assumptions': 'Draft 5x faster, 70% acceptance rate'
        }

Batching and Serving

Continuous Batcher

Complete implementation for continuous batching scheduler:

import asyncio
from dataclasses import dataclass, field
from typing import Optional
import time

@dataclass
class InFlightRequest:
    """A request being processed."""
    id: str
    input_tokens: list[int]
    generated_tokens: list[int] = field(default_factory=list)
    max_tokens: int = 100
    future: asyncio.Future = field(default_factory=asyncio.Future)

    @property
    def is_complete(self) -> bool:
        return len(self.generated_tokens) >= self.max_tokens

    @property
    def current_position(self) -> int:
        return len(self.input_tokens) + len(self.generated_tokens)

class ContinuousBatcher:
    """Continuous batching scheduler."""

    def __init__(
        self,
        model,
        max_batch_size: int = 64,
        max_waiting_ms: float = 10
    ):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_waiting_ms = max_waiting_ms

        self.waiting_queue: list[InFlightRequest] = []
        self.active_requests: list[InFlightRequest] = []

        self.running = False

    async def submit(self, input_tokens: list[int], max_tokens: int) -> str:
        """Submit a request for processing."""
        request = InFlightRequest(
            id=self._generate_id(),
            input_tokens=input_tokens,
            max_tokens=max_tokens
        )
        self.waiting_queue.append(request)

        # Wait for completion
        result = await request.future
        return result

    async def run_loop(self):
        """Main processing loop."""
        self.running = True

        while self.running:
            # Add new requests to active batch
            self._fill_batch()

            if not self.active_requests:
                await asyncio.sleep(0.001)
                continue

            # Process one step for all active requests
            await self._step()

            # Remove completed requests
            self._retire_completed()

    def _fill_batch(self):
        """Add waiting requests to active batch."""
        while (
            self.waiting_queue and
            len(self.active_requests) < self.max_batch_size
        ):
            request = self.waiting_queue.pop(0)

            # Prefill: process all input tokens
            # This is typically done separately from decode
            self.active_requests.append(request)

    async def _step(self):
        """Generate one token for each active request."""
        # Prepare batch input
        # In practice, this involves careful padding and position management

        # Run model forward pass for all active requests
        # next_tokens = self.model.forward_batch(self.active_requests)

        # For illustration:
        for request in self.active_requests:
            next_token = self._sample_next_token(request)
            request.generated_tokens.append(next_token)

    def _retire_completed(self):
        """Complete finished requests."""
        completed = [r for r in self.active_requests if r.is_complete]

        for request in completed:
            self.active_requests.remove(request)
            # Return result via future
            request.future.set_result(request.generated_tokens)

    def _sample_next_token(self, request: InFlightRequest) -> int:
        # Simplified - would use model output
        return 0

    def _generate_id(self) -> str:
        return str(time.time_ns())

Latency Throughput Optimizer

Complete implementation for latency vs throughput optimization:

from dataclasses import dataclass

@dataclass
class OptimizationGoal:
    """Optimization goal configuration."""
    goal: str
    target_metric: str
    target_value: float
    secondary_constraint: str
    constraint_value: float

class LatencyThroughputOptimizer:
    """Optimize for latency or throughput."""

    def __init__(self, model, goal: OptimizationGoal):
        self.model = model
        self.goal = goal

    def recommend_configuration(self) -> dict:
        """Recommend configuration based on goal."""
        if 'latency' in self.goal.goal.lower():
            return self._optimize_for_latency()
        else:
            return self._optimize_for_throughput()

    def _optimize_for_latency(self) -> dict:
        """Configuration for latency optimization."""
        return {
            'batch_size': 1,
            'quantization': 'int8 or fp16',
            'parallelism': 'tensor_parallel if model is large',
            'serving': 'dedicated instances, no batching delay',
            'techniques': [
                'Enable speculative decoding (2-3x speedup for generation)',
                'Use Flash Attention (lower latency, especially for long sequences)',
                'Consider smaller model if quality allows',
                'Tensor parallelism to reduce per-token latency',
                'CUDA Graphs to reduce launch overhead'
            ],
            'anti_patterns': [
                'Large batch sizes (add queuing delay)',
                'Continuous batching with long max wait',
                'Pipeline parallelism (adds latency)'
            ]
        }

    def _optimize_for_throughput(self) -> dict:
        """Configuration for throughput optimization."""
        return {
            'batch_size': 'Maximum that fits in memory',
            'quantization': 'Aggressive (int4/int8)',
            'parallelism': 'Data parallel across GPUs',
            'serving': 'Continuous batching with dynamic batch sizes',
            'techniques': [
                'Maximize batch size',
                'Use continuous batching',
                'Aggressive quantization for more concurrent requests',
                'PagedAttention for memory efficiency',
                'Preemption for SLO management'
            ],
            'anti_patterns': [
                'Processing requests one at a time',
                'Waiting for specific batch size (starves GPU)',
                'Over-provisioning for low utilization'
            ]
        }

    def find_pareto_optimal(
        self,
        latency_targets: list[float],
        measure_func
    ) -> list[dict]:
        """Find Pareto-optimal configurations."""
        configurations = self._generate_configurations()
        results = []

        for config in configurations:
            throughput, latency = measure_func(config)
            results.append({
                'config': config,
                'throughput': throughput,
                'latency_p99': latency
            })

        # Find Pareto frontier
        pareto = []
        for r in results:
            is_dominated = False
            for other in results:
                if (other['throughput'] > r['throughput'] and
                    other['latency_p99'] < r['latency_p99']):
                    is_dominated = True
                    break
            if not is_dominated:
                pareto.append(r)

        return sorted(pareto, key=lambda x: x['latency_p99'])

    def _generate_configurations(self) -> list[dict]:
        """Generate configurations to test."""
        return [
            {'batch_size': 1, 'quantization': 'fp16'},
            {'batch_size': 4, 'quantization': 'fp16'},
            {'batch_size': 8, 'quantization': 'fp16'},
            {'batch_size': 16, 'quantization': 'int8'},
            {'batch_size': 32, 'quantization': 'int8'},
        ]

Streaming Optimizer

Complete implementation for streaming inference optimization:

from dataclasses import dataclass

@dataclass
class StreamingMetrics:
    """Metrics for streaming inference."""
    time_to_first_token_ms: float  # Prefill latency
    inter_token_latency_ms: float  # Time between tokens
    total_tokens: int
    total_time_ms: float

    @property
    def tokens_per_second(self) -> float:
        return self.total_tokens / (self.total_time_ms / 1000)

    @property
    def perceived_speed(self) -> str:
        """Human perception of speed."""
        if self.time_to_first_token_ms < 200 and self.inter_token_latency_ms < 50:
            return "Feels instant"
        elif self.time_to_first_token_ms < 500:
            return "Feels responsive"
        elif self.time_to_first_token_ms < 2000:
            return "Noticeable delay"
        else:
            return "Feels slow"

class StreamingOptimizer:
    """Optimize streaming inference."""

    def analyze_bottleneck(self, metrics: StreamingMetrics) -> dict:
        """Analyze streaming bottleneck."""
        issues = []

        if metrics.time_to_first_token_ms > 500:
            issues.append({
                'issue': 'Slow prefill',
                'cause': 'Long input processing',
                'solutions': [
                    'Enable Flash Attention for prefill',
                    'Optimize input preprocessing',
                    'Use prompt caching for repeated prefixes'
                ]
            })

        if metrics.inter_token_latency_ms > 100:
            issues.append({
                'issue': 'Slow decode',
                'cause': 'Per-token generation too slow',
                'solutions': [
                    'Speculative decoding',
                    'Smaller model or quantization',
                    'Reduce KV cache memory pressure'
                ]
            })

        # Check if prefill dominates
        prefill_fraction = metrics.time_to_first_token_ms / metrics.total_time_ms
        if prefill_fraction > 0.5 and metrics.total_tokens > 10:
            issues.append({
                'issue': 'Prefill-dominated',
                'cause': 'Input much longer than output',
                'solutions': [
                    'Prompt caching',
                    'Input summarization/compression',
                    'Speculative prefill'
                ]
            })

        return {
            'metrics': metrics.__dict__,
            'issues': issues
        }

Cost and Performance Analysis

Cost Optimizer

Complete implementation for inference cost optimization:

from dataclasses import dataclass

@dataclass
class CostDriver:
    """GPU cost driver analysis."""
    driver: str
    percentage_of_cost: str
    optimization_opportunities: list[str]

INFERENCE_COST_DRIVERS = [
    CostDriver(
        driver="GPU compute hours",
        percentage_of_cost="60-80%",
        optimization_opportunities=[
            "Increase GPU utilization (batching)",
            "Use right-sized GPUs",
            "Spot/preemptible instances for batch workloads"
        ]
    ),
    CostDriver(
        driver="GPU memory (affects GPU choice)",
        percentage_of_cost="Indirect",
        optimization_opportunities=[
            "Quantization to fit larger batches",
            "PagedAttention for memory efficiency",
            "Model pruning/distillation"
        ]
    ),
    CostDriver(
        driver="Idle time",
        percentage_of_cost="10-40% typically",
        optimization_opportunities=[
            "Auto-scaling based on demand",
            "Bin-packing multiple models",
            "Scale to zero for dev/staging"
        ]
    ),
]

class CostOptimizer:
    """Optimize inference costs."""

    def __init__(self, workload_profile: dict):
        self.profile = workload_profile

    def calculate_cost_per_token(
        self,
        gpu_cost_per_hour: float,
        tokens_per_second: float,
        utilization: float
    ) -> float:
        """Calculate cost per token."""
        effective_tokens_per_hour = tokens_per_second * 3600 * utilization
        return gpu_cost_per_hour / effective_tokens_per_hour

    def compare_configurations(
        self,
        configurations: list[dict]
    ) -> list[dict]:
        """Compare cost-performance of configurations."""
        results = []

        for config in configurations:
            cost_per_token = self.calculate_cost_per_token(
                gpu_cost_per_hour=config['gpu_cost'],
                tokens_per_second=config['throughput'],
                utilization=config['utilization']
            )

            results.append({
                'config': config['name'],
                'cost_per_million_tokens': cost_per_token * 1_000_000,
                'monthly_cost_at_1m_tokens_day': cost_per_token * 1_000_000 * 30,
                'meets_latency_slo': config['p99_latency'] < self.profile['latency_slo'],
                'cost_efficiency_score': config['throughput'] / config['gpu_cost']
            })

        return sorted(results, key=lambda x: x['cost_per_million_tokens'])

    def recommend_gpu(self, requirements: dict) -> dict:
        """Recommend GPU based on requirements."""
        gpus = [
            {'name': 'A10', 'memory_gb': 24, 'cost_hour': 1.0, 'tflops_fp16': 125},
            {'name': 'A100-40', 'memory_gb': 40, 'cost_hour': 2.5, 'tflops_fp16': 312},
            {'name': 'A100-80', 'memory_gb': 80, 'cost_hour': 3.5, 'tflops_fp16': 312},
            {'name': 'H100', 'memory_gb': 80, 'cost_hour': 5.0, 'tflops_fp16': 990},
        ]

        model_memory = requirements['model_memory_gb']
        min_latency = requirements.get('min_latency', False)

        viable = [g for g in gpus if g['memory_gb'] >= model_memory]

        if not viable:
            return {'error': 'No single GPU has enough memory; need tensor parallelism'}

        if min_latency:
            # Prefer fastest GPU
            return max(viable, key=lambda g: g['tflops_fp16'])
        else:
            # Prefer best cost/performance
            return max(viable, key=lambda g: g['tflops_fp16'] / g['cost_hour'])

Performance Investigator

Complete implementation for investigating performance regressions:

from dataclasses import dataclass

@dataclass
class PerformanceRegression:
    """Performance regression investigation."""
    symptom: str
    possible_causes: list[str]
    investigation_steps: list[str]

COMMON_REGRESSIONS = [
    PerformanceRegression(
        symptom="Latency increased after model update",
        possible_causes=[
            "Larger model size",
            "Different architecture",
            "Lost quantization",
            "Shape changes causing recompilation"
        ],
        investigation_steps=[
            "Compare model sizes (parameters, memory footprint)",
            "Check if quantization is still applied",
            "Profile both versions and compare top operations",
            "Check for torch.compile recompilation warnings"
        ]
    ),
    PerformanceRegression(
        symptom="Throughput dropped after infrastructure change",
        possible_causes=[
            "Different GPU instance type",
            "Network configuration change",
            "Resource contention from co-located workloads",
            "Driver/CUDA version change"
        ],
        investigation_steps=[
            "Verify GPU specs match expectations",
            "Check GPU utilization and memory",
            "Look for other processes using GPU",
            "Compare CUDA/driver versions"
        ]
    ),
    PerformanceRegression(
        symptom="Increasing latency over time",
        possible_causes=[
            "Memory leak (GPU or CPU)",
            "Cache filling up",
            "Increasing queue depth",
            "Garbage collection pressure"
        ],
        investigation_steps=[
            "Monitor GPU memory over time",
            "Check KV cache utilization",
            "Monitor request queue depth",
            "Profile memory allocations"
        ]
    ),
]

class PerformanceInvestigator:
    """Investigate performance issues systematically."""

    def __init__(self, metrics_client):
        self.metrics = metrics_client

    async def investigate_latency_increase(
        self,
        baseline_period: str,
        regression_period: str
    ) -> dict:
        """Investigate latency regression."""
        # Get metrics for both periods
        baseline = await self.metrics.get_metrics(baseline_period)
        current = await self.metrics.get_metrics(regression_period)

        findings = []

        # Compare key metrics
        latency_increase = (
            current['p99_latency'] - baseline['p99_latency']
        ) / baseline['p99_latency']

        if latency_increase > 0.1:
            findings.append({
                'metric': 'p99_latency',
                'change': f'+{latency_increase:.1%}',
                'baseline': baseline['p99_latency'],
                'current': current['p99_latency']
            })

        # Check batch size changes
        if current.get('avg_batch_size', 0) != baseline.get('avg_batch_size', 0):
            findings.append({
                'metric': 'batch_size',
                'change': current['avg_batch_size'] - baseline['avg_batch_size'],
                'impact': 'Higher batch size increases latency but improves throughput'
            })

        # Check GPU metrics
        if current.get('gpu_utilization', 0) < baseline.get('gpu_utilization', 0) - 10:
            findings.append({
                'metric': 'gpu_utilization',
                'change': 'Decreased',
                'indicates': 'Possible CPU bottleneck or launch overhead'
            })

        return {
            'latency_change': latency_increase,
            'findings': findings,
            'suggested_next_steps': self._suggest_investigation(findings)
        }

    def _suggest_investigation(self, findings: list) -> list[str]:
        steps = ["Profile with torch.profiler to identify slowest operations"]

        for finding in findings:
            if finding['metric'] == 'gpu_utilization':
                steps.append("Check for CPU preprocessing bottleneck")
                steps.append("Look for kernel launch overhead (many small operations)")

            if finding['metric'] == 'batch_size':
                steps.append("Verify batch size change was intentional")

        return steps

Benchmark Suite

Complete implementation for standardized benchmarking:

from dataclasses import dataclass
import torch
import gc

@dataclass
class BenchmarkConfig:
    """Benchmark configuration."""
    name: str
    warmup_iterations: int
    benchmark_iterations: int
    input_configs: list[dict]
    metrics_to_collect: list[str]

class BenchmarkSuite:
    """Standardized benchmark suite."""

    def __init__(self, model, config: BenchmarkConfig):
        self.model = model
        self.config = config
        self.results = {}

    def run(self) -> dict:
        """Run complete benchmark suite."""
        all_results = {}

        for input_config in self.config.input_configs:
            # Clear state
            gc.collect()
            torch.cuda.empty_cache()
            torch.cuda.reset_peak_memory_stats()

            # Create input
            test_input = self._create_input(input_config)

            # Warmup
            for _ in range(self.config.warmup_iterations):
                with torch.no_grad():
                    _ = self.model(test_input)
            torch.cuda.synchronize()

            # Benchmark
            times = []
            memory_peaks = []

            for _ in range(self.config.benchmark_iterations):
                torch.cuda.reset_peak_memory_stats()

                start = torch.cuda.Event(enable_timing=True)
                end = torch.cuda.Event(enable_timing=True)

                start.record()
                with torch.no_grad():
                    _ = self.model(test_input)
                end.record()

                torch.cuda.synchronize()
                times.append(start.elapsed_time(end))
                memory_peaks.append(torch.cuda.max_memory_allocated())

            # Calculate statistics
            all_results[input_config['name']] = {
                'input_config': input_config,
                'latency_ms': {
                    'mean': sum(times) / len(times),
                    'std': self._std(times),
                    'min': min(times),
                    'max': max(times),
                    'p50': sorted(times)[len(times)//2],
                    'p99': sorted(times)[int(len(times)*0.99)]
                },
                'memory_mb': {
                    'peak': max(memory_peaks) / 1e6,
                    'mean': sum(memory_peaks) / len(memory_peaks) / 1e6
                }
            }

        return all_results

    def _create_input(self, config: dict):
        return torch.randn(
            config.get('batch_size', 1),
            config.get('seq_length', 512),
            config.get('hidden_size', 4096),
            device='cuda'
        )

    def _std(self, values: list) -> float:
        mean = sum(values) / len(values)
        variance = sum((x - mean) ** 2 for x in values) / len(values)
        return variance ** 0.5

    def generate_report(self, results: dict) -> str:
        """Generate markdown benchmark report."""
        lines = [
            "# Benchmark Results",
            f"\nModel: {self.model.__class__.__name__}",
            f"Iterations: {self.config.benchmark_iterations}",
            "\n## Latency Results\n",
            "| Config | Mean (ms) | P50 (ms) | P99 (ms) | Memory (MB) |",
            "|--------|-----------|----------|----------|-------------|"
        ]

        for name, data in results.items():
            lat = data['latency_ms']
            mem = data['memory_mb']
            lines.append(
                f"| {name} | {lat['mean']:.2f} | {lat['p50']:.2f} | "
                f"{lat['p99']:.2f} | {mem['peak']:.0f} |"
            )

        return "\n".join(lines)

Production Utilities

Compilation Manager

Complete implementation for managing model compilation with caching:

import torch
import hashlib
import os

class CompilationManager:
    """Manage model compilation for production."""

    def __init__(self, model, cache_dir: str = "/tmp/compile_cache"):
        self.model = model
        self.cache_dir = cache_dir
        self.compiled_model = None

    def compile_with_caching(self, sample_input):
        """Compile model with caching."""
        # Create cache key from model and input shape
        model_hash = self._hash_model()
        input_shape = tuple(sample_input.shape)
        cache_key = f"{model_hash}_{input_shape}"
        cache_path = os.path.join(self.cache_dir, f"{cache_key}.pt")

        # Check cache
        if os.path.exists(cache_path):
            print(f"Loading compiled model from cache")
            self.compiled_model = torch.load(cache_path)
        else:
            print("Compiling model (this may take several minutes)...")
            self.compiled_model = torch.compile(
                self.model,
                mode='max-autotune',
                fullgraph=True
            )

            # Warmup to trigger compilation
            for _ in range(3):
                _ = self.compiled_model(sample_input)

            # Note: Actual caching of compiled artifacts is complex
            # torch.compile uses its own caching mechanism
            # This is a simplified illustration

        return self.compiled_model

    def _hash_model(self) -> str:
        """Create hash of model for cache key."""
        state_bytes = str(self.model.state_dict().keys()).encode()
        return hashlib.md5(state_bytes).hexdigest()[:16]

Memory Fragmentation Monitor

Complete implementation for monitoring GPU memory fragmentation:

import torch
import gc

class MemoryFragmentationMonitor:
    """Monitor GPU memory fragmentation."""

    def check_fragmentation(self) -> dict:
        """Check for memory fragmentation."""
        # Get memory stats
        allocated = torch.cuda.memory_allocated()
        reserved = torch.cuda.memory_reserved()
        total = torch.cuda.get_device_properties(0).total_memory

        fragmentation = (reserved - allocated) / reserved if reserved > 0 else 0

        return {
            'allocated_gb': allocated / 1e9,
            'reserved_gb': reserved / 1e9,
            'total_gb': total / 1e9,
            'fragmentation_ratio': fragmentation,
            'is_fragmented': fragmentation > 0.2,
            'recommendation': self._recommend(fragmentation)
        }

    def _recommend(self, fragmentation: float) -> str:
        if fragmentation < 0.1:
            return "Memory well utilized"
        elif fragmentation < 0.2:
            return "Some fragmentation; consider defragmentation during low-load periods"
        else:
            return "High fragmentation; consider torch.cuda.empty_cache() or restart"

    def defragment(self):
        """Attempt to defragment GPU memory."""
        gc.collect()
        torch.cuda.empty_cache()
        torch.cuda.synchronize()

Optimization Journey Planner

Complete implementation for planning optimization journeys:

from dataclasses import dataclass

@dataclass
class OptimizationJourney:
    """End-to-end optimization case study."""
    stage: str
    action: str
    before_metric: str
    after_metric: str
    technique_used: str

OPTIMIZATION_JOURNEY = [
    OptimizationJourney(
        stage="Baseline",
        action="Measure initial performance",
        before_metric="N/A",
        after_metric="250ms p99, 40 req/s throughput",
        technique_used="PyTorch profiler"
    ),
    OptimizationJourney(
        stage="Low-hanging fruit",
        action="Enable Flash Attention",
        before_metric="250ms p99",
        after_metric="180ms p99",
        technique_used="attn_implementation='flash_attention_2'"
    ),
    OptimizationJourney(
        stage="Compilation",
        action="Apply torch.compile",
        before_metric="180ms p99",
        after_metric="140ms p99",
        technique_used="torch.compile(model, mode='reduce-overhead')"
    ),
    OptimizationJourney(
        stage="Quantization",
        action="Apply INT8 weight-only quantization",
        before_metric="140ms, 40 req/s",
        after_metric="120ms, 60 req/s (larger batches fit)",
        technique_used="bitsandbytes INT8"
    ),
    OptimizationJourney(
        stage="Batching",
        action="Implement continuous batching",
        before_metric="60 req/s",
        after_metric="180 req/s",
        technique_used="vLLM-style continuous batching"
    ),
    OptimizationJourney(
        stage="Speculative decoding",
        action="Add draft model for speculation",
        before_metric="120ms time-to-first-token",
        after_metric="120ms TTFT, 2.5x faster generation",
        technique_used="68M draft model, 4 speculative tokens"
    ),
    OptimizationJourney(
        stage="Final result",
        action="Compare to baseline",
        before_metric="250ms p99, 40 req/s",
        after_metric="100ms p99, 180 req/s, 4.5x cost reduction",
        technique_used="Combined optimization stack"
    ),
]

def plan_optimization_journey(
    current_performance: dict,
    targets: dict
) -> list[dict]:
    """Plan optimization journey based on current state and targets."""
    plan = []

    # Prioritize optimizations by impact and effort
    optimizations = [
        {
            'name': 'Flash Attention',
            'latency_improvement': 0.3,
            'throughput_improvement': 0.2,
            'effort': 'Low',
            'prerequisite': 'PyTorch 2.0+, supported model'
        },
        {
            'name': 'torch.compile',
            'latency_improvement': 0.2,
            'throughput_improvement': 0.15,
            'effort': 'Low',
            'prerequisite': 'PyTorch 2.0+'
        },
        {
            'name': 'Quantization (INT8)',
            'latency_improvement': 0.15,
            'throughput_improvement': 0.5,
            'effort': 'Medium',
            'prerequisite': 'Quality testing required'
        },
        {
            'name': 'Continuous Batching',
            'latency_improvement': -0.1,  # May slightly increase
            'throughput_improvement': 2.0,
            'effort': 'High',
            'prerequisite': 'Serving infrastructure change'
        },
        {
            'name': 'Speculative Decoding',
            'latency_improvement': 0.4,
            'throughput_improvement': 0.1,
            'effort': 'Medium',
            'prerequisite': 'Draft model needed'
        },
    ]

    # Calculate gap to target
    latency_gap = current_performance['latency'] / targets['latency'] - 1
    throughput_gap = targets['throughput'] / current_performance['throughput'] - 1

    # Select optimizations based on biggest gap
    if latency_gap > 0:
        latency_opts = sorted(
            optimizations,
            key=lambda x: x['latency_improvement'],
            reverse=True
        )
        plan.extend(latency_opts[:3])

    if throughput_gap > 0:
        throughput_opts = sorted(
            optimizations,
            key=lambda x: x['throughput_improvement'],
            reverse=True
        )
        for opt in throughput_opts:
            if opt not in plan:
                plan.append(opt)
                if len(plan) >= 5:
                    break

    return plan