Performance Engineering Code Reference
Complete code implementations for Chapter 61: Performance Engineering.
Table of Contents
- Profiling and Diagnostics
- GPU Optimization
- Memory Optimization
- Attention Optimization
- Batching and Serving
- Cost and Performance Analysis
- Production Utilities
Profiling and Diagnostics
Inference Profiler
Complete implementation for profiling PyTorch inference workloads:
import torch
from torch.profiler import profile, record_function, ProfilerActivity
class InferenceProfiler:
"""Profile PyTorch inference workloads."""
def __init__(self, model, device='cuda'):
self.model = model.to(device)
self.device = device
def profile_inference(
self,
input_batch,
warmup_runs: int = 3,
profile_runs: int = 5
) -> dict:
"""Profile inference with detailed breakdown."""
# Warmup
for _ in range(warmup_runs):
with torch.no_grad():
_ = self.model(input_batch)
if self.device == 'cuda':
torch.cuda.synchronize()
# Profile
with profile(
activities=[
ProfilerActivity.CPU,
ProfilerActivity.CUDA,
],
record_shapes=True,
profile_memory=True,
with_stack=True,
) as prof:
for _ in range(profile_runs):
with record_function("model_inference"):
with torch.no_grad():
output = self.model(input_batch)
if self.device == 'cuda':
torch.cuda.synchronize()
# Analyze results
return self._analyze_profile(prof)
def _analyze_profile(self, prof) -> dict:
"""Extract key metrics from profile."""
# Get top operations by CUDA time
events = prof.key_averages()
cuda_time_total = sum(e.cuda_time_total for e in events)
cpu_time_total = sum(e.cpu_time_total for e in events)
top_cuda_ops = sorted(
events,
key=lambda e: e.cuda_time_total,
reverse=True
)[:10]
top_memory_ops = sorted(
events,
key=lambda e: e.cuda_memory_usage if e.cuda_memory_usage else 0,
reverse=True
)[:10]
return {
'cuda_time_total_us': cuda_time_total,
'cpu_time_total_us': cpu_time_total,
'top_cuda_ops': [
{
'name': op.key,
'cuda_time_us': op.cuda_time_total,
'percentage': op.cuda_time_total / cuda_time_total * 100
}
for op in top_cuda_ops
],
'top_memory_ops': [
{
'name': op.key,
'memory_bytes': op.cuda_memory_usage
}
for op in top_memory_ops if op.cuda_memory_usage
],
'trace_path': None # Can export to Chrome trace
}
def export_trace(self, prof, path: str):
"""Export trace for visualization."""
prof.export_chrome_trace(path)Bottleneck Diagnoser
Complete implementation for identifying performance bottlenecks:
from dataclasses import dataclass
@dataclass
class BottleneckPattern:
"""Common bottleneck pattern and diagnosis."""
pattern: str
symptoms: list[str]
diagnosis_approach: str
common_causes: list[str]
solutions: list[str]
BOTTLENECK_PATTERNS = [
BottleneckPattern(
pattern="CPU-bound",
symptoms=[
"GPU utilization is low (<70%)",
"CPU utilization is high",
"Adding GPUs doesn't improve throughput"
],
diagnosis_approach="Profile CPU with py-spy; check for Python overhead",
common_causes=[
"Data preprocessing on CPU",
"Too much Python in hot path",
"Tokenization/detokenization",
"Network serialization"
],
solutions=[
"Move preprocessing to GPU",
"Use async data loading",
"Batch preprocessing",
"Use compiled/native tokenizers"
]
),
BottleneckPattern(
pattern="Memory-bandwidth bound",
symptoms=[
"GPU compute utilization is low",
"Memory throughput near theoretical max",
"Adding more FLOPS doesn't help"
],
diagnosis_approach="Check memory throughput in Nsight Compute",
common_causes=[
"Large model that doesn't fit in cache",
"Inefficient memory access patterns",
"Too many memory-bound operations"
],
solutions=[
"Quantization to reduce memory traffic",
"Kernel fusion to reduce memory reads/writes",
"Optimize memory layout",
"Use Flash Attention for attention layers"
]
),
BottleneckPattern(
pattern="GPU kernel launch bound",
symptoms=[
"Many small kernels visible in trace",
"High CPU time between GPU operations",
"GPU idle between operations"
],
diagnosis_approach="Look at timeline in Nsight Systems",
common_causes=[
"Too many small operations",
"Eager execution overhead",
"Lack of kernel fusion"
],
solutions=[
"Use torch.compile() or TensorRT",
"CUDA Graphs",
"Operator fusion",
"Batch operations together"
]
),
BottleneckPattern(
pattern="Host-device transfer bound",
symptoms=[
"High time spent in cudaMemcpy",
"GPU idle waiting for data",
"PCIe bandwidth saturated"
],
diagnosis_approach="Check transfer times in Nsight Systems",
common_causes=[
"Moving data CPU<->GPU frequently",
"Not using pinned memory",
"Synchronous transfers blocking execution"
],
solutions=[
"Keep data on GPU",
"Use pinned memory for transfers",
"Overlap computation with transfer",
"Batch transfers"
]
),
]
class BottleneckDiagnoser:
"""Diagnose performance bottlenecks."""
def __init__(self, profiler_results: dict):
self.results = profiler_results
def diagnose(self) -> list[dict]:
"""Identify likely bottlenecks."""
bottlenecks = []
# Check CPU vs GPU time ratio
cpu_gpu_ratio = (
self.results['cpu_time_total_us'] /
max(self.results['cuda_time_total_us'], 1)
)
if cpu_gpu_ratio > 2.0:
bottlenecks.append({
'pattern': 'cpu_bound',
'confidence': min(cpu_gpu_ratio / 5.0, 1.0),
'evidence': f'CPU time is {cpu_gpu_ratio:.1f}x GPU time'
})
# Check for many small operations
top_ops = self.results.get('top_cuda_ops', [])
if len(top_ops) > 5:
op_count = len(top_ops)
largest_op_percentage = top_ops[0]['percentage'] if top_ops else 0
if largest_op_percentage < 30:
bottlenecks.append({
'pattern': 'kernel_launch_bound',
'confidence': 0.7,
'evidence': f'Largest op is only {largest_op_percentage:.1f}% of time'
})
return bottlenecksGPU Optimization
Model Optimizer (torch.compile)
Complete implementation for optimizing models with torch.compile:
import torch
import time
class ModelOptimizer:
"""Optimize models with torch.compile."""
def __init__(self, model):
self.original_model = model
self.compiled_model = None
def compile(
self,
mode: str = 'default',
fullgraph: bool = False,
dynamic: bool = True
):
"""Compile the model for optimized execution."""
# Modes: 'default', 'reduce-overhead', 'max-autotune'
# - default: Good balance of compile time and speedup
# - reduce-overhead: Minimize framework overhead (CUDA graphs)
# - max-autotune: Try more options, slower compile
self.compiled_model = torch.compile(
self.original_model,
mode=mode,
fullgraph=fullgraph, # Capture entire model as one graph
dynamic=dynamic, # Handle dynamic shapes
)
return self.compiled_model
def benchmark_compilation_benefit(
self,
sample_input,
warmup_runs: int = 10,
benchmark_runs: int = 100
) -> dict:
"""Benchmark original vs compiled performance."""
# Warmup and benchmark original
for _ in range(warmup_runs):
with torch.no_grad():
_ = self.original_model(sample_input)
torch.cuda.synchronize()
original_times = []
for _ in range(benchmark_runs):
start = time.perf_counter()
with torch.no_grad():
_ = self.original_model(sample_input)
torch.cuda.synchronize()
original_times.append(time.perf_counter() - start)
# Warmup and benchmark compiled
if self.compiled_model is None:
self.compile()
for _ in range(warmup_runs):
with torch.no_grad():
_ = self.compiled_model(sample_input)
torch.cuda.synchronize()
compiled_times = []
for _ in range(benchmark_runs):
start = time.perf_counter()
with torch.no_grad():
_ = self.compiled_model(sample_input)
torch.cuda.synchronize()
compiled_times.append(time.perf_counter() - start)
original_avg = sum(original_times) / len(original_times)
compiled_avg = sum(compiled_times) / len(compiled_times)
return {
'original_avg_ms': original_avg * 1000,
'compiled_avg_ms': compiled_avg * 1000,
'speedup': original_avg / compiled_avg,
'time_saved_per_inference_ms': (original_avg - compiled_avg) * 1000
}TensorRT Optimizer
Complete implementation for TensorRT model optimization:
from dataclasses import dataclass
@dataclass
class CompilerOption:
"""Model compilation options."""
name: str
description: str
best_for: str
tradeoffs: list[str]
COMPILER_OPTIONS = [
CompilerOption(
name="torch.compile",
description="PyTorch's built-in compiler using TorchDynamo + Inductor",
best_for="Quick optimization with minimal code changes",
tradeoffs=[
"First run is slow (compilation)",
"Dynamic shapes can cause recompilation",
"Some ops may fall back to eager"
]
),
CompilerOption(
name="TensorRT",
description="NVIDIA's high-performance inference optimizer",
best_for="Production NVIDIA GPU deployment",
tradeoffs=[
"Static shapes required (or multiple profiles)",
"Longer compilation time",
"NVIDIA-only",
"Best speedups, especially for FP16/INT8"
]
),
CompilerOption(
name="ONNX Runtime",
description="Cross-platform inference engine",
best_for="Multi-platform deployment, CPU and GPU",
tradeoffs=[
"Conversion from PyTorch required",
"Some ops may not be supported",
"Good for standardization"
]
),
CompilerOption(
name="vLLM / TGI",
description="Specialized LLM serving with built-in optimizations",
best_for="LLM inference with batching",
tradeoffs=[
"LLM-specific, not general purpose",
"Includes continuous batching, PagedAttention",
"May require model-specific support"
]
),
]
class TensorRTOptimizer:
"""Optimize models with TensorRT."""
def __init__(self, model_path: str):
self.model_path = model_path
def optimize(
self,
precision: str = 'fp16', # fp32, fp16, int8
batch_sizes: list[int] = [1, 8, 32],
max_sequence_length: int = 2048,
workspace_gb: float = 4.0
) -> dict:
"""Convert model to TensorRT."""
# This is a conceptual example
# Real implementation uses tensorrt library
config = {
'precision': precision,
'optimization_profiles': [],
'workspace_gb': workspace_gb
}
# Create optimization profiles for different shapes
for batch_size in batch_sizes:
profile = {
'min_shape': (1, 1),
'opt_shape': (batch_size, max_sequence_length // 2),
'max_shape': (batch_size, max_sequence_length)
}
config['optimization_profiles'].append(profile)
# Build engine (conceptual)
# engine = self._build_engine(config)
return {
'config': config,
'expected_speedup': self._estimate_speedup(precision),
'memory_reduction': self._estimate_memory_reduction(precision)
}
def _estimate_speedup(self, precision: str) -> str:
estimates = {
'fp32': '1.5-2x',
'fp16': '2-4x',
'int8': '3-5x'
}
return estimates.get(precision, 'Unknown')
def _estimate_memory_reduction(self, precision: str) -> str:
estimates = {
'fp32': '20-30%',
'fp16': '50%',
'int8': '75%'
}
return estimates.get(precision, 'Unknown')CUDA Graph Optimizer
Complete implementation for CUDA Graph optimization:
import torch
import time
class CUDAGraphOptimizer:
"""Optimize inference with CUDA Graphs."""
def __init__(self, model, sample_input):
self.model = model
self.sample_input = sample_input
self.graph = None
self.static_input = None
self.static_output = None
def capture_graph(self):
"""Capture CUDA graph for the model."""
# Warmup
self.model.eval()
with torch.no_grad():
for _ in range(3):
_ = self.model(self.sample_input)
torch.cuda.synchronize()
# Allocate static tensors
self.static_input = self.sample_input.clone()
self.static_output = self.model(self.static_input)
# Capture graph
self.graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(self.graph):
self.static_output = self.model(self.static_input)
def inference(self, input_tensor):
"""Run inference using captured graph."""
if self.graph is None:
raise RuntimeError("Graph not captured. Call capture_graph() first.")
# Copy input to static buffer
self.static_input.copy_(input_tensor)
# Replay graph
self.graph.replay()
# Return copy of output
return self.static_output.clone()
def benchmark_benefit(
self,
num_iterations: int = 1000
) -> dict:
"""Benchmark CUDA Graph benefit."""
# Benchmark without graph
torch.cuda.synchronize()
start = time.perf_counter()
for _ in range(num_iterations):
_ = self.model(self.sample_input)
torch.cuda.synchronize()
baseline_time = time.perf_counter() - start
# Ensure graph is captured
if self.graph is None:
self.capture_graph()
# Benchmark with graph
torch.cuda.synchronize()
start = time.perf_counter()
for _ in range(num_iterations):
self.graph.replay()
torch.cuda.synchronize()
graph_time = time.perf_counter() - start
return {
'baseline_ms': baseline_time / num_iterations * 1000,
'graph_ms': graph_time / num_iterations * 1000,
'speedup': baseline_time / graph_time,
'note': 'Speedup is higher for small models with many kernels'
}Tensor Parallel Inference
Complete implementation for tensor parallel model splitting:
from dataclasses import dataclass
from enum import Enum
import torch.nn as nn
class ParallelismType(Enum):
TENSOR = "tensor" # Split tensors across GPUs
PIPELINE = "pipeline" # Split layers across GPUs
DATA = "data" # Replicate model, split data
EXPERT = "expert" # MoE expert parallelism
@dataclass
class ParallelismStrategy:
"""Model parallelism strategy."""
type: ParallelismType
description: str
best_for: str
implementation_complexity: str
efficiency: str
PARALLELISM_STRATEGIES = [
ParallelismStrategy(
type=ParallelismType.DATA,
description="Each GPU has full model copy, processes different data",
best_for="Training, batch inference when model fits on one GPU",
implementation_complexity="Low",
efficiency="High for training; limited for latency-sensitive inference"
),
ParallelismStrategy(
type=ParallelismType.TENSOR,
description="Split weight matrices across GPUs, all process same input",
best_for="Large models for low-latency inference",
implementation_complexity="Medium-High",
efficiency="High with fast interconnect (NVLink); reduces latency"
),
ParallelismStrategy(
type=ParallelismType.PIPELINE,
description="Different layers on different GPUs, data flows through",
best_for="Very deep models, training with micro-batching",
implementation_complexity="Medium",
efficiency="Requires careful scheduling to avoid bubbles"
),
ParallelismStrategy(
type=ParallelismType.EXPERT,
description="Different experts on different GPUs for MoE models",
best_for="Mixture of Experts models",
implementation_complexity="High",
efficiency="Depends on routing efficiency"
),
]
class TensorParallelInference:
"""Tensor parallel inference setup."""
def __init__(self, model, num_gpus: int):
self.model = model
self.num_gpus = num_gpus
def analyze_split_strategy(self) -> dict:
"""Analyze how to split the model."""
analysis = {
'total_params': sum(p.numel() for p in self.model.parameters()),
'layers': [],
'recommended_split': []
}
# Analyze each layer
for name, module in self.model.named_modules():
if hasattr(module, 'weight'):
weight_shape = module.weight.shape
can_split = self._can_split_layer(module)
analysis['layers'].append({
'name': name,
'shape': weight_shape,
'can_split': can_split,
'split_dim': self._get_split_dim(module) if can_split else None
})
return analysis
def _can_split_layer(self, module) -> bool:
"""Check if layer can be tensor-parallel split."""
return isinstance(module, (nn.Linear, nn.Embedding))
def _get_split_dim(self, module) -> int:
"""Get dimension to split on."""
if isinstance(module, nn.Linear):
# Split output dimension for column parallel
# Split input dimension for row parallel
return 0 # Simplified
return 0
def estimate_communication_overhead(self) -> dict:
"""Estimate inter-GPU communication overhead."""
# Tensor parallelism requires all-reduce after each layer
# Communication volume per layer = 2 * batch * seq * hidden * (n-1)/n
# For a 70B model with 8-way TP:
# ~80 layers, each needs all-reduce
# With NVLink: ~100us per all-reduce
# Total overhead: ~8ms per forward pass
return {
'all_reduces_per_forward': 80, # Approximate for 70B
'estimated_overhead_ms': 8,
'recommendation': 'Use NVLink for tensor parallelism; PCIe is too slow'
}Memory Optimization
Optimized KV Cache
Complete implementation for memory-efficient KV cache:
from dataclasses import dataclass
import torch
@dataclass
class KVCacheConfig:
"""KV cache configuration."""
num_layers: int
num_heads: int
head_dim: int
max_seq_length: int
dtype: torch.dtype
class OptimizedKVCache:
"""Memory-efficient KV cache implementation."""
def __init__(self, config: KVCacheConfig, batch_size: int):
self.config = config
self.batch_size = batch_size
# Pre-allocate cache
cache_shape = (
config.num_layers,
2, # K and V
batch_size,
config.num_heads,
config.max_seq_length,
config.head_dim
)
# Use contiguous memory for efficient access
self.cache = torch.zeros(
cache_shape,
dtype=config.dtype,
device='cuda'
)
self.seq_lengths = torch.zeros(batch_size, dtype=torch.long, device='cuda')
def update(
self,
layer_idx: int,
keys: torch.Tensor,
values: torch.Tensor,
positions: torch.Tensor
):
"""Update cache at specific positions."""
# keys, values: [batch, heads, seq_len, head_dim]
batch_indices = torch.arange(self.batch_size, device='cuda')
for seq_pos in range(keys.shape[2]):
cache_pos = positions[:, seq_pos] # [batch]
self.cache[layer_idx, 0, batch_indices, :, cache_pos, :] = keys[:, :, seq_pos, :]
self.cache[layer_idx, 1, batch_indices, :, cache_pos, :] = values[:, :, seq_pos, :]
# Update sequence lengths
self.seq_lengths = torch.maximum(
self.seq_lengths,
positions[:, -1] + 1
)
def get(self, layer_idx: int, batch_idx: int, up_to_pos: int):
"""Get cached K,V up to position."""
k = self.cache[layer_idx, 0, batch_idx, :, :up_to_pos, :]
v = self.cache[layer_idx, 1, batch_idx, :, :up_to_pos, :]
return k, v
def memory_usage_bytes(self) -> int:
"""Calculate memory usage."""
element_size = self.cache.element_size()
return self.cache.numel() * element_sizePaged KV Cache
Complete implementation for paged attention KV cache (vLLM-style):
import torch
class PagedKVCache:
"""Paged attention KV cache (vLLM-style)."""
def __init__(
self,
num_layers: int,
num_heads: int,
head_dim: int,
block_size: int, # Tokens per block
num_blocks: int, # Total blocks in pool
dtype: torch.dtype
):
self.num_layers = num_layers
self.num_heads = num_heads
self.head_dim = head_dim
self.block_size = block_size
self.num_blocks = num_blocks
self.dtype = dtype
# Block pool: [num_blocks, num_layers, 2, block_size, num_heads, head_dim]
self.block_pool = torch.zeros(
(num_blocks, num_layers, 2, block_size, num_heads, head_dim),
dtype=dtype,
device='cuda'
)
# Block allocation tracking
self.free_blocks = list(range(num_blocks))
self.sequence_blocks = {} # seq_id -> list of block indices
def allocate_sequence(self, seq_id: int, num_tokens: int) -> list[int]:
"""Allocate blocks for a sequence."""
num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size
if len(self.free_blocks) < num_blocks_needed:
raise MemoryError("Insufficient KV cache blocks")
allocated = []
for _ in range(num_blocks_needed):
block_idx = self.free_blocks.pop(0)
allocated.append(block_idx)
self.sequence_blocks[seq_id] = allocated
return allocated
def free_sequence(self, seq_id: int):
"""Free blocks for a completed sequence."""
if seq_id in self.sequence_blocks:
blocks = self.sequence_blocks.pop(seq_id)
self.free_blocks.extend(blocks)
def utilization(self) -> float:
"""Calculate cache utilization."""
allocated = self.num_blocks - len(self.free_blocks)
return allocated / self.num_blocksQuantization Analyzer
Complete implementation for analyzing quantization tradeoffs:
from dataclasses import dataclass
from enum import Enum
class QuantizationType(Enum):
INT8 = "int8"
INT4 = "int4"
FP8 = "fp8"
NF4 = "nf4" # 4-bit NormalFloat
GPTQ = "gptq"
AWQ = "awq"
@dataclass
class QuantizationConfig:
"""Quantization configuration."""
method: QuantizationType
bits: int
group_size: int # Number of weights sharing a scale factor
desc: str
QUANTIZATION_METHODS = {
'int8_static': QuantizationConfig(
method=QuantizationType.INT8,
bits=8,
group_size=-1, # Per-tensor
desc="Static INT8 quantization; calibrated on representative data"
),
'int8_dynamic': QuantizationConfig(
method=QuantizationType.INT8,
bits=8,
group_size=-1,
desc="Dynamic INT8; scale computed at runtime"
),
'int4_gptq': QuantizationConfig(
method=QuantizationType.GPTQ,
bits=4,
group_size=128,
desc="GPTQ 4-bit; good quality with careful calibration"
),
'int4_awq': QuantizationConfig(
method=QuantizationType.AWQ,
bits=4,
group_size=128,
desc="AWQ 4-bit; activation-aware, often better than GPTQ"
),
'nf4_qlora': QuantizationConfig(
method=QuantizationType.NF4,
bits=4,
group_size=64,
desc="NF4 for QLoRA fine-tuning; information-theoretically optimal"
),
}
class QuantizationAnalyzer:
"""Analyze quantization tradeoffs."""
def __init__(self, model_size_params: int, dtype_bytes: int = 2):
self.model_size = model_size_params
self.original_bytes = model_size_params * dtype_bytes
def analyze_config(self, config: QuantizationConfig) -> dict:
"""Analyze a quantization configuration."""
# Calculate memory savings
bits_per_param = config.bits
# Add scale/zero-point overhead
if config.group_size > 0:
scale_overhead = 32 / config.group_size # FP16 scale per group
effective_bits = bits_per_param + scale_overhead
else:
effective_bits = bits_per_param + 32 / self.model_size # Negligible
quantized_bytes = self.model_size * effective_bits / 8
memory_reduction = 1 - (quantized_bytes / self.original_bytes)
# Estimate speedup (memory-bound case)
# Speedup roughly proportional to memory reduction
estimated_speedup = self.original_bytes / quantized_bytes
return {
'config': config.desc,
'bits_per_weight': effective_bits,
'original_gb': self.original_bytes / 1e9,
'quantized_gb': quantized_bytes / 1e9,
'memory_reduction': f'{memory_reduction:.1%}',
'estimated_speedup': f'{estimated_speedup:.1f}x',
'quality_notes': self._quality_notes(config)
}
def _quality_notes(self, config: QuantizationConfig) -> str:
if config.bits >= 8:
return "Minimal quality loss; suitable for most use cases"
elif config.bits >= 4:
return "Some quality loss; test carefully on your use case"
else:
return "Significant quality loss; may not be suitable for complex tasks"Gradient Checkpoint Analyzer
Complete implementation for gradient checkpointing analysis:
import torch.utils.checkpoint as checkpoint
class GradientCheckpointAnalyzer:
"""Analyze gradient checkpointing tradeoffs."""
def __init__(self, model, batch_size: int, seq_length: int):
self.model = model
self.batch_size = batch_size
self.seq_length = seq_length
def analyze_memory_savings(self, checkpoint_every_n_layers: int) -> dict:
"""Analyze memory savings from checkpointing."""
num_layers = self._count_layers()
# Without checkpointing: store all activations
activation_memory_per_layer = self._estimate_activation_memory()
baseline_memory = num_layers * activation_memory_per_layer
# With checkpointing: only store checkpoint activations
num_checkpoints = num_layers // checkpoint_every_n_layers
checkpointed_memory = num_checkpoints * activation_memory_per_layer
# Recompute cost
recompute_layers = num_layers - num_checkpoints
extra_forward_passes = recompute_layers
return {
'baseline_activation_gb': baseline_memory / 1e9,
'checkpointed_activation_gb': checkpointed_memory / 1e9,
'memory_savings': 1 - (checkpointed_memory / baseline_memory),
'extra_compute_percentage': extra_forward_passes / num_layers * 100,
'recommendation': self._recommend(checkpoint_every_n_layers)
}
def _count_layers(self) -> int:
count = 0
for name, _ in self.model.named_modules():
if 'layer' in name.lower():
count += 1
return max(count, 1)
def _estimate_activation_memory(self) -> float:
# Simplified: activation per layer = batch * seq * hidden * 4 bytes * factor
hidden_size = 4096 # Assume
bytes_per_element = 4 # FP32
factor = 10 # Multiple tensors per layer
return self.batch_size * self.seq_length * hidden_size * bytes_per_element * factor
def _recommend(self, n: int) -> str:
if n == 1:
return "Maximum memory savings, ~33% training slowdown"
elif n <= 4:
return "Good balance of memory and compute"
else:
return "Minimal memory savings, consider reducing"
def enable_gradient_checkpointing(model):
"""Enable gradient checkpointing on a model."""
# For transformer models, checkpoint each layer
if hasattr(model, 'gradient_checkpointing_enable'):
model.gradient_checkpointing_enable()
else:
# Manual implementation for custom models
for layer in model.layers:
layer.forward = checkpoint.checkpoint(layer.forward)Attention Optimization
Attention Profiler
Complete implementation for comparing attention implementations:
import torch
import torch.nn.functional as F
import time
class AttentionProfiler:
"""Profile attention performance."""
def compare_implementations(
self,
batch_size: int,
seq_len: int,
num_heads: int,
head_dim: int
) -> dict:
"""Compare attention implementations."""
q = torch.randn(batch_size, num_heads, seq_len, head_dim, device='cuda')
k = torch.randn(batch_size, num_heads, seq_len, head_dim, device='cuda')
v = torch.randn(batch_size, num_heads, seq_len, head_dim, device='cuda')
results = {}
# Standard attention (if memory allows)
try:
torch.cuda.empty_cache()
start_mem = torch.cuda.memory_allocated()
torch.cuda.synchronize()
start = time.perf_counter()
# Standard implementation
attn_weights = torch.matmul(q, k.transpose(-2, -1)) / (head_dim ** 0.5)
attn_weights = F.softmax(attn_weights, dim=-1)
output = torch.matmul(attn_weights, v)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
peak_mem = torch.cuda.max_memory_allocated() - start_mem
results['standard'] = {
'time_ms': elapsed * 1000,
'memory_mb': peak_mem / 1e6
}
except RuntimeError:
results['standard'] = {'error': 'OOM'}
# Flash Attention (SDPA)
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
start_mem = torch.cuda.memory_allocated()
torch.cuda.synchronize()
start = time.perf_counter()
output = F.scaled_dot_product_attention(q, k, v)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
peak_mem = torch.cuda.max_memory_allocated() - start_mem
results['flash'] = {
'time_ms': elapsed * 1000,
'memory_mb': peak_mem / 1e6
}
return resultsSpeculative Decoder
Complete implementation for speculative decoding:
import torch
class SpeculativeDecoder:
"""Implement speculative decoding for faster generation."""
def __init__(
self,
target_model, # Large, accurate model
draft_model, # Small, fast model
num_speculative_tokens: int = 4
):
self.target = target_model
self.draft = draft_model
self.gamma = num_speculative_tokens
def generate(
self,
input_ids: torch.Tensor,
max_new_tokens: int
) -> torch.Tensor:
"""Generate with speculative decoding."""
generated = input_ids.clone()
while generated.shape[1] - input_ids.shape[1] < max_new_tokens:
# Draft model generates gamma tokens quickly
draft_tokens = self._draft_generate(generated, self.gamma)
# Target model verifies all at once
verified, num_accepted = self._verify(generated, draft_tokens)
# Append verified tokens
generated = torch.cat([generated, verified], dim=1)
# If we rejected all draft tokens, just sample one from target
if num_accepted == 0:
next_token = self._sample_from_target(generated)
generated = torch.cat([generated, next_token], dim=1)
return generated[:, :input_ids.shape[1] + max_new_tokens]
def _draft_generate(
self,
context: torch.Tensor,
num_tokens: int
) -> torch.Tensor:
"""Generate draft tokens with small model."""
draft_tokens = []
current = context
for _ in range(num_tokens):
logits = self.draft(current)[:, -1, :]
next_token = torch.argmax(logits, dim=-1, keepdim=True)
draft_tokens.append(next_token)
current = torch.cat([current, next_token], dim=1)
return torch.cat(draft_tokens, dim=1)
def _verify(
self,
context: torch.Tensor,
draft_tokens: torch.Tensor
) -> tuple[torch.Tensor, int]:
"""Verify draft tokens with target model."""
# Get target model probabilities for all positions
full_seq = torch.cat([context, draft_tokens], dim=1)
target_logits = self.target(full_seq)
# For each draft token, check if target would have sampled it
verified = []
for i in range(draft_tokens.shape[1]):
pos = context.shape[1] + i - 1
target_probs = torch.softmax(target_logits[:, pos, :], dim=-1)
draft_token = draft_tokens[:, i]
# Accept if target probability is high enough
# (Simplified - real implementation uses rejection sampling)
if target_probs[0, draft_token].item() > 0.1:
verified.append(draft_tokens[:, i:i+1])
else:
break
if verified:
return torch.cat(verified, dim=1), len(verified)
else:
return torch.tensor([[]]), 0
def _sample_from_target(self, context: torch.Tensor) -> torch.Tensor:
"""Sample a single token from the target model."""
logits = self.target(context)[:, -1, :]
return torch.argmax(logits, dim=-1, keepdim=True)
def expected_speedup(self) -> dict:
"""Estimate expected speedup."""
# Simplified model
# Assume draft model is 5x faster
# Accept rate of ~70%
draft_cost = 1 / 5 # Relative to target
target_verify_cost = self.gamma / 1 # Verify gamma tokens at once
cost_per_accepted = (
(self.gamma * draft_cost + target_verify_cost) /
(self.gamma * 0.7) # 70% accept rate
)
baseline_cost = 1 # One target model call per token
return {
'estimated_speedup': baseline_cost / cost_per_accepted,
'assumptions': 'Draft 5x faster, 70% acceptance rate'
}Batching and Serving
Continuous Batcher
Complete implementation for continuous batching scheduler:
import asyncio
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class InFlightRequest:
"""A request being processed."""
id: str
input_tokens: list[int]
generated_tokens: list[int] = field(default_factory=list)
max_tokens: int = 100
future: asyncio.Future = field(default_factory=asyncio.Future)
@property
def is_complete(self) -> bool:
return len(self.generated_tokens) >= self.max_tokens
@property
def current_position(self) -> int:
return len(self.input_tokens) + len(self.generated_tokens)
class ContinuousBatcher:
"""Continuous batching scheduler."""
def __init__(
self,
model,
max_batch_size: int = 64,
max_waiting_ms: float = 10
):
self.model = model
self.max_batch_size = max_batch_size
self.max_waiting_ms = max_waiting_ms
self.waiting_queue: list[InFlightRequest] = []
self.active_requests: list[InFlightRequest] = []
self.running = False
async def submit(self, input_tokens: list[int], max_tokens: int) -> str:
"""Submit a request for processing."""
request = InFlightRequest(
id=self._generate_id(),
input_tokens=input_tokens,
max_tokens=max_tokens
)
self.waiting_queue.append(request)
# Wait for completion
result = await request.future
return result
async def run_loop(self):
"""Main processing loop."""
self.running = True
while self.running:
# Add new requests to active batch
self._fill_batch()
if not self.active_requests:
await asyncio.sleep(0.001)
continue
# Process one step for all active requests
await self._step()
# Remove completed requests
self._retire_completed()
def _fill_batch(self):
"""Add waiting requests to active batch."""
while (
self.waiting_queue and
len(self.active_requests) < self.max_batch_size
):
request = self.waiting_queue.pop(0)
# Prefill: process all input tokens
# This is typically done separately from decode
self.active_requests.append(request)
async def _step(self):
"""Generate one token for each active request."""
# Prepare batch input
# In practice, this involves careful padding and position management
# Run model forward pass for all active requests
# next_tokens = self.model.forward_batch(self.active_requests)
# For illustration:
for request in self.active_requests:
next_token = self._sample_next_token(request)
request.generated_tokens.append(next_token)
def _retire_completed(self):
"""Complete finished requests."""
completed = [r for r in self.active_requests if r.is_complete]
for request in completed:
self.active_requests.remove(request)
# Return result via future
request.future.set_result(request.generated_tokens)
def _sample_next_token(self, request: InFlightRequest) -> int:
# Simplified - would use model output
return 0
def _generate_id(self) -> str:
return str(time.time_ns())Latency Throughput Optimizer
Complete implementation for latency vs throughput optimization:
from dataclasses import dataclass
@dataclass
class OptimizationGoal:
"""Optimization goal configuration."""
goal: str
target_metric: str
target_value: float
secondary_constraint: str
constraint_value: float
class LatencyThroughputOptimizer:
"""Optimize for latency or throughput."""
def __init__(self, model, goal: OptimizationGoal):
self.model = model
self.goal = goal
def recommend_configuration(self) -> dict:
"""Recommend configuration based on goal."""
if 'latency' in self.goal.goal.lower():
return self._optimize_for_latency()
else:
return self._optimize_for_throughput()
def _optimize_for_latency(self) -> dict:
"""Configuration for latency optimization."""
return {
'batch_size': 1,
'quantization': 'int8 or fp16',
'parallelism': 'tensor_parallel if model is large',
'serving': 'dedicated instances, no batching delay',
'techniques': [
'Enable speculative decoding (2-3x speedup for generation)',
'Use Flash Attention (lower latency, especially for long sequences)',
'Consider smaller model if quality allows',
'Tensor parallelism to reduce per-token latency',
'CUDA Graphs to reduce launch overhead'
],
'anti_patterns': [
'Large batch sizes (add queuing delay)',
'Continuous batching with long max wait',
'Pipeline parallelism (adds latency)'
]
}
def _optimize_for_throughput(self) -> dict:
"""Configuration for throughput optimization."""
return {
'batch_size': 'Maximum that fits in memory',
'quantization': 'Aggressive (int4/int8)',
'parallelism': 'Data parallel across GPUs',
'serving': 'Continuous batching with dynamic batch sizes',
'techniques': [
'Maximize batch size',
'Use continuous batching',
'Aggressive quantization for more concurrent requests',
'PagedAttention for memory efficiency',
'Preemption for SLO management'
],
'anti_patterns': [
'Processing requests one at a time',
'Waiting for specific batch size (starves GPU)',
'Over-provisioning for low utilization'
]
}
def find_pareto_optimal(
self,
latency_targets: list[float],
measure_func
) -> list[dict]:
"""Find Pareto-optimal configurations."""
configurations = self._generate_configurations()
results = []
for config in configurations:
throughput, latency = measure_func(config)
results.append({
'config': config,
'throughput': throughput,
'latency_p99': latency
})
# Find Pareto frontier
pareto = []
for r in results:
is_dominated = False
for other in results:
if (other['throughput'] > r['throughput'] and
other['latency_p99'] < r['latency_p99']):
is_dominated = True
break
if not is_dominated:
pareto.append(r)
return sorted(pareto, key=lambda x: x['latency_p99'])
def _generate_configurations(self) -> list[dict]:
"""Generate configurations to test."""
return [
{'batch_size': 1, 'quantization': 'fp16'},
{'batch_size': 4, 'quantization': 'fp16'},
{'batch_size': 8, 'quantization': 'fp16'},
{'batch_size': 16, 'quantization': 'int8'},
{'batch_size': 32, 'quantization': 'int8'},
]Streaming Optimizer
Complete implementation for streaming inference optimization:
from dataclasses import dataclass
@dataclass
class StreamingMetrics:
"""Metrics for streaming inference."""
time_to_first_token_ms: float # Prefill latency
inter_token_latency_ms: float # Time between tokens
total_tokens: int
total_time_ms: float
@property
def tokens_per_second(self) -> float:
return self.total_tokens / (self.total_time_ms / 1000)
@property
def perceived_speed(self) -> str:
"""Human perception of speed."""
if self.time_to_first_token_ms < 200 and self.inter_token_latency_ms < 50:
return "Feels instant"
elif self.time_to_first_token_ms < 500:
return "Feels responsive"
elif self.time_to_first_token_ms < 2000:
return "Noticeable delay"
else:
return "Feels slow"
class StreamingOptimizer:
"""Optimize streaming inference."""
def analyze_bottleneck(self, metrics: StreamingMetrics) -> dict:
"""Analyze streaming bottleneck."""
issues = []
if metrics.time_to_first_token_ms > 500:
issues.append({
'issue': 'Slow prefill',
'cause': 'Long input processing',
'solutions': [
'Enable Flash Attention for prefill',
'Optimize input preprocessing',
'Use prompt caching for repeated prefixes'
]
})
if metrics.inter_token_latency_ms > 100:
issues.append({
'issue': 'Slow decode',
'cause': 'Per-token generation too slow',
'solutions': [
'Speculative decoding',
'Smaller model or quantization',
'Reduce KV cache memory pressure'
]
})
# Check if prefill dominates
prefill_fraction = metrics.time_to_first_token_ms / metrics.total_time_ms
if prefill_fraction > 0.5 and metrics.total_tokens > 10:
issues.append({
'issue': 'Prefill-dominated',
'cause': 'Input much longer than output',
'solutions': [
'Prompt caching',
'Input summarization/compression',
'Speculative prefill'
]
})
return {
'metrics': metrics.__dict__,
'issues': issues
}Cost and Performance Analysis
Cost Optimizer
Complete implementation for inference cost optimization:
from dataclasses import dataclass
@dataclass
class CostDriver:
"""GPU cost driver analysis."""
driver: str
percentage_of_cost: str
optimization_opportunities: list[str]
INFERENCE_COST_DRIVERS = [
CostDriver(
driver="GPU compute hours",
percentage_of_cost="60-80%",
optimization_opportunities=[
"Increase GPU utilization (batching)",
"Use right-sized GPUs",
"Spot/preemptible instances for batch workloads"
]
),
CostDriver(
driver="GPU memory (affects GPU choice)",
percentage_of_cost="Indirect",
optimization_opportunities=[
"Quantization to fit larger batches",
"PagedAttention for memory efficiency",
"Model pruning/distillation"
]
),
CostDriver(
driver="Idle time",
percentage_of_cost="10-40% typically",
optimization_opportunities=[
"Auto-scaling based on demand",
"Bin-packing multiple models",
"Scale to zero for dev/staging"
]
),
]
class CostOptimizer:
"""Optimize inference costs."""
def __init__(self, workload_profile: dict):
self.profile = workload_profile
def calculate_cost_per_token(
self,
gpu_cost_per_hour: float,
tokens_per_second: float,
utilization: float
) -> float:
"""Calculate cost per token."""
effective_tokens_per_hour = tokens_per_second * 3600 * utilization
return gpu_cost_per_hour / effective_tokens_per_hour
def compare_configurations(
self,
configurations: list[dict]
) -> list[dict]:
"""Compare cost-performance of configurations."""
results = []
for config in configurations:
cost_per_token = self.calculate_cost_per_token(
gpu_cost_per_hour=config['gpu_cost'],
tokens_per_second=config['throughput'],
utilization=config['utilization']
)
results.append({
'config': config['name'],
'cost_per_million_tokens': cost_per_token * 1_000_000,
'monthly_cost_at_1m_tokens_day': cost_per_token * 1_000_000 * 30,
'meets_latency_slo': config['p99_latency'] < self.profile['latency_slo'],
'cost_efficiency_score': config['throughput'] / config['gpu_cost']
})
return sorted(results, key=lambda x: x['cost_per_million_tokens'])
def recommend_gpu(self, requirements: dict) -> dict:
"""Recommend GPU based on requirements."""
gpus = [
{'name': 'A10', 'memory_gb': 24, 'cost_hour': 1.0, 'tflops_fp16': 125},
{'name': 'A100-40', 'memory_gb': 40, 'cost_hour': 2.5, 'tflops_fp16': 312},
{'name': 'A100-80', 'memory_gb': 80, 'cost_hour': 3.5, 'tflops_fp16': 312},
{'name': 'H100', 'memory_gb': 80, 'cost_hour': 5.0, 'tflops_fp16': 990},
]
model_memory = requirements['model_memory_gb']
min_latency = requirements.get('min_latency', False)
viable = [g for g in gpus if g['memory_gb'] >= model_memory]
if not viable:
return {'error': 'No single GPU has enough memory; need tensor parallelism'}
if min_latency:
# Prefer fastest GPU
return max(viable, key=lambda g: g['tflops_fp16'])
else:
# Prefer best cost/performance
return max(viable, key=lambda g: g['tflops_fp16'] / g['cost_hour'])Performance Investigator
Complete implementation for investigating performance regressions:
from dataclasses import dataclass
@dataclass
class PerformanceRegression:
"""Performance regression investigation."""
symptom: str
possible_causes: list[str]
investigation_steps: list[str]
COMMON_REGRESSIONS = [
PerformanceRegression(
symptom="Latency increased after model update",
possible_causes=[
"Larger model size",
"Different architecture",
"Lost quantization",
"Shape changes causing recompilation"
],
investigation_steps=[
"Compare model sizes (parameters, memory footprint)",
"Check if quantization is still applied",
"Profile both versions and compare top operations",
"Check for torch.compile recompilation warnings"
]
),
PerformanceRegression(
symptom="Throughput dropped after infrastructure change",
possible_causes=[
"Different GPU instance type",
"Network configuration change",
"Resource contention from co-located workloads",
"Driver/CUDA version change"
],
investigation_steps=[
"Verify GPU specs match expectations",
"Check GPU utilization and memory",
"Look for other processes using GPU",
"Compare CUDA/driver versions"
]
),
PerformanceRegression(
symptom="Increasing latency over time",
possible_causes=[
"Memory leak (GPU or CPU)",
"Cache filling up",
"Increasing queue depth",
"Garbage collection pressure"
],
investigation_steps=[
"Monitor GPU memory over time",
"Check KV cache utilization",
"Monitor request queue depth",
"Profile memory allocations"
]
),
]
class PerformanceInvestigator:
"""Investigate performance issues systematically."""
def __init__(self, metrics_client):
self.metrics = metrics_client
async def investigate_latency_increase(
self,
baseline_period: str,
regression_period: str
) -> dict:
"""Investigate latency regression."""
# Get metrics for both periods
baseline = await self.metrics.get_metrics(baseline_period)
current = await self.metrics.get_metrics(regression_period)
findings = []
# Compare key metrics
latency_increase = (
current['p99_latency'] - baseline['p99_latency']
) / baseline['p99_latency']
if latency_increase > 0.1:
findings.append({
'metric': 'p99_latency',
'change': f'+{latency_increase:.1%}',
'baseline': baseline['p99_latency'],
'current': current['p99_latency']
})
# Check batch size changes
if current.get('avg_batch_size', 0) != baseline.get('avg_batch_size', 0):
findings.append({
'metric': 'batch_size',
'change': current['avg_batch_size'] - baseline['avg_batch_size'],
'impact': 'Higher batch size increases latency but improves throughput'
})
# Check GPU metrics
if current.get('gpu_utilization', 0) < baseline.get('gpu_utilization', 0) - 10:
findings.append({
'metric': 'gpu_utilization',
'change': 'Decreased',
'indicates': 'Possible CPU bottleneck or launch overhead'
})
return {
'latency_change': latency_increase,
'findings': findings,
'suggested_next_steps': self._suggest_investigation(findings)
}
def _suggest_investigation(self, findings: list) -> list[str]:
steps = ["Profile with torch.profiler to identify slowest operations"]
for finding in findings:
if finding['metric'] == 'gpu_utilization':
steps.append("Check for CPU preprocessing bottleneck")
steps.append("Look for kernel launch overhead (many small operations)")
if finding['metric'] == 'batch_size':
steps.append("Verify batch size change was intentional")
return stepsBenchmark Suite
Complete implementation for standardized benchmarking:
from dataclasses import dataclass
import torch
import gc
@dataclass
class BenchmarkConfig:
"""Benchmark configuration."""
name: str
warmup_iterations: int
benchmark_iterations: int
input_configs: list[dict]
metrics_to_collect: list[str]
class BenchmarkSuite:
"""Standardized benchmark suite."""
def __init__(self, model, config: BenchmarkConfig):
self.model = model
self.config = config
self.results = {}
def run(self) -> dict:
"""Run complete benchmark suite."""
all_results = {}
for input_config in self.config.input_configs:
# Clear state
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
# Create input
test_input = self._create_input(input_config)
# Warmup
for _ in range(self.config.warmup_iterations):
with torch.no_grad():
_ = self.model(test_input)
torch.cuda.synchronize()
# Benchmark
times = []
memory_peaks = []
for _ in range(self.config.benchmark_iterations):
torch.cuda.reset_peak_memory_stats()
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
with torch.no_grad():
_ = self.model(test_input)
end.record()
torch.cuda.synchronize()
times.append(start.elapsed_time(end))
memory_peaks.append(torch.cuda.max_memory_allocated())
# Calculate statistics
all_results[input_config['name']] = {
'input_config': input_config,
'latency_ms': {
'mean': sum(times) / len(times),
'std': self._std(times),
'min': min(times),
'max': max(times),
'p50': sorted(times)[len(times)//2],
'p99': sorted(times)[int(len(times)*0.99)]
},
'memory_mb': {
'peak': max(memory_peaks) / 1e6,
'mean': sum(memory_peaks) / len(memory_peaks) / 1e6
}
}
return all_results
def _create_input(self, config: dict):
return torch.randn(
config.get('batch_size', 1),
config.get('seq_length', 512),
config.get('hidden_size', 4096),
device='cuda'
)
def _std(self, values: list) -> float:
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return variance ** 0.5
def generate_report(self, results: dict) -> str:
"""Generate markdown benchmark report."""
lines = [
"# Benchmark Results",
f"\nModel: {self.model.__class__.__name__}",
f"Iterations: {self.config.benchmark_iterations}",
"\n## Latency Results\n",
"| Config | Mean (ms) | P50 (ms) | P99 (ms) | Memory (MB) |",
"|--------|-----------|----------|----------|-------------|"
]
for name, data in results.items():
lat = data['latency_ms']
mem = data['memory_mb']
lines.append(
f"| {name} | {lat['mean']:.2f} | {lat['p50']:.2f} | "
f"{lat['p99']:.2f} | {mem['peak']:.0f} |"
)
return "\n".join(lines)Production Utilities
Compilation Manager
Complete implementation for managing model compilation with caching:
import torch
import hashlib
import os
class CompilationManager:
"""Manage model compilation for production."""
def __init__(self, model, cache_dir: str = "/tmp/compile_cache"):
self.model = model
self.cache_dir = cache_dir
self.compiled_model = None
def compile_with_caching(self, sample_input):
"""Compile model with caching."""
# Create cache key from model and input shape
model_hash = self._hash_model()
input_shape = tuple(sample_input.shape)
cache_key = f"{model_hash}_{input_shape}"
cache_path = os.path.join(self.cache_dir, f"{cache_key}.pt")
# Check cache
if os.path.exists(cache_path):
print(f"Loading compiled model from cache")
self.compiled_model = torch.load(cache_path)
else:
print("Compiling model (this may take several minutes)...")
self.compiled_model = torch.compile(
self.model,
mode='max-autotune',
fullgraph=True
)
# Warmup to trigger compilation
for _ in range(3):
_ = self.compiled_model(sample_input)
# Note: Actual caching of compiled artifacts is complex
# torch.compile uses its own caching mechanism
# This is a simplified illustration
return self.compiled_model
def _hash_model(self) -> str:
"""Create hash of model for cache key."""
state_bytes = str(self.model.state_dict().keys()).encode()
return hashlib.md5(state_bytes).hexdigest()[:16]Memory Fragmentation Monitor
Complete implementation for monitoring GPU memory fragmentation:
import torch
import gc
class MemoryFragmentationMonitor:
"""Monitor GPU memory fragmentation."""
def check_fragmentation(self) -> dict:
"""Check for memory fragmentation."""
# Get memory stats
allocated = torch.cuda.memory_allocated()
reserved = torch.cuda.memory_reserved()
total = torch.cuda.get_device_properties(0).total_memory
fragmentation = (reserved - allocated) / reserved if reserved > 0 else 0
return {
'allocated_gb': allocated / 1e9,
'reserved_gb': reserved / 1e9,
'total_gb': total / 1e9,
'fragmentation_ratio': fragmentation,
'is_fragmented': fragmentation > 0.2,
'recommendation': self._recommend(fragmentation)
}
def _recommend(self, fragmentation: float) -> str:
if fragmentation < 0.1:
return "Memory well utilized"
elif fragmentation < 0.2:
return "Some fragmentation; consider defragmentation during low-load periods"
else:
return "High fragmentation; consider torch.cuda.empty_cache() or restart"
def defragment(self):
"""Attempt to defragment GPU memory."""
gc.collect()
torch.cuda.empty_cache()
torch.cuda.synchronize()Optimization Journey Planner
Complete implementation for planning optimization journeys:
from dataclasses import dataclass
@dataclass
class OptimizationJourney:
"""End-to-end optimization case study."""
stage: str
action: str
before_metric: str
after_metric: str
technique_used: str
OPTIMIZATION_JOURNEY = [
OptimizationJourney(
stage="Baseline",
action="Measure initial performance",
before_metric="N/A",
after_metric="250ms p99, 40 req/s throughput",
technique_used="PyTorch profiler"
),
OptimizationJourney(
stage="Low-hanging fruit",
action="Enable Flash Attention",
before_metric="250ms p99",
after_metric="180ms p99",
technique_used="attn_implementation='flash_attention_2'"
),
OptimizationJourney(
stage="Compilation",
action="Apply torch.compile",
before_metric="180ms p99",
after_metric="140ms p99",
technique_used="torch.compile(model, mode='reduce-overhead')"
),
OptimizationJourney(
stage="Quantization",
action="Apply INT8 weight-only quantization",
before_metric="140ms, 40 req/s",
after_metric="120ms, 60 req/s (larger batches fit)",
technique_used="bitsandbytes INT8"
),
OptimizationJourney(
stage="Batching",
action="Implement continuous batching",
before_metric="60 req/s",
after_metric="180 req/s",
technique_used="vLLM-style continuous batching"
),
OptimizationJourney(
stage="Speculative decoding",
action="Add draft model for speculation",
before_metric="120ms time-to-first-token",
after_metric="120ms TTFT, 2.5x faster generation",
technique_used="68M draft model, 4 speculative tokens"
),
OptimizationJourney(
stage="Final result",
action="Compare to baseline",
before_metric="250ms p99, 40 req/s",
after_metric="100ms p99, 180 req/s, 4.5x cost reduction",
technique_used="Combined optimization stack"
),
]
def plan_optimization_journey(
current_performance: dict,
targets: dict
) -> list[dict]:
"""Plan optimization journey based on current state and targets."""
plan = []
# Prioritize optimizations by impact and effort
optimizations = [
{
'name': 'Flash Attention',
'latency_improvement': 0.3,
'throughput_improvement': 0.2,
'effort': 'Low',
'prerequisite': 'PyTorch 2.0+, supported model'
},
{
'name': 'torch.compile',
'latency_improvement': 0.2,
'throughput_improvement': 0.15,
'effort': 'Low',
'prerequisite': 'PyTorch 2.0+'
},
{
'name': 'Quantization (INT8)',
'latency_improvement': 0.15,
'throughput_improvement': 0.5,
'effort': 'Medium',
'prerequisite': 'Quality testing required'
},
{
'name': 'Continuous Batching',
'latency_improvement': -0.1, # May slightly increase
'throughput_improvement': 2.0,
'effort': 'High',
'prerequisite': 'Serving infrastructure change'
},
{
'name': 'Speculative Decoding',
'latency_improvement': 0.4,
'throughput_improvement': 0.1,
'effort': 'Medium',
'prerequisite': 'Draft model needed'
},
]
# Calculate gap to target
latency_gap = current_performance['latency'] / targets['latency'] - 1
throughput_gap = targets['throughput'] / current_performance['throughput'] - 1
# Select optimizations based on biggest gap
if latency_gap > 0:
latency_opts = sorted(
optimizations,
key=lambda x: x['latency_improvement'],
reverse=True
)
plan.extend(latency_opts[:3])
if throughput_gap > 0:
throughput_opts = sorted(
optimizations,
key=lambda x: x['throughput_improvement'],
reverse=True
)
for opt in throughput_opts:
if opt not in plan:
plan.append(opt)
if len(plan) >= 5:
break
return plan