Cost Engineering Code Reference
Complete code implementations for Chapter 66: Cost Engineering.
Table of Contents
- Cost Categories and Breakdown
- Total Cost of Ownership
- Cost Attribution
- Unit Economics
- GPU Optimization
- Inference Cost Optimization
- Training Cost Optimization
- Build vs Buy Analysis
- API Cost Management
- Token Optimization
- Cost Visibility
- Cost Governance
- Scaling Cost Models
Cost Categories and Breakdown
Cost Item and Breakdown Classes
Complete implementation for modeling AI system costs:
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class CostCategory(Enum):
COMPUTE_TRAINING = "compute_training"
COMPUTE_INFERENCE = "compute_inference"
DATA_STORAGE = "data_storage"
DATA_PROCESSING = "data_processing"
EXTERNAL_APIS = "external_apis"
ENGINEERING_TIME = "engineering_time"
INFRASTRUCTURE = "infrastructure"
LICENSING = "licensing"
@dataclass
class CostItem:
"""A cost item in the AI system."""
category: CostCategory
description: str
unit: str
unit_cost: float
monthly_volume: float
monthly_cost: float
@dataclass
class AICostBreakdown:
"""Complete cost breakdown for an AI system."""
system_name: str
items: list[CostItem]
total_monthly: float
cost_per_prediction: float
cost_per_user: float
@classmethod
def calculate(cls, system_name: str, items: list[CostItem], predictions_per_month: int, users: int):
"""Calculate cost breakdown."""
total = sum(item.monthly_cost for item in items)
return cls(
system_name=system_name,
items=items,
total_monthly=total,
cost_per_prediction=total / max(1, predictions_per_month),
cost_per_user=total / max(1, users)
)
# Example cost breakdown for a recommendation system {.unnumbered}
EXAMPLE_RECOMMENDATION_SYSTEM = [
CostItem(
category=CostCategory.COMPUTE_TRAINING,
description="Weekly model retraining on 8x A100",
unit="GPU-hours/month",
unit_cost=3.0,
monthly_volume=160, # 8 GPUs * 5 hours * 4 weeks
monthly_cost=480
),
CostItem(
category=CostCategory.COMPUTE_INFERENCE,
description="Inference serving on g5.xlarge",
unit="instance-hours",
unit_cost=1.0,
monthly_volume=2160, # 3 instances * 24 * 30
monthly_cost=2160
),
CostItem(
category=CostCategory.DATA_STORAGE,
description="Feature store and model artifacts",
unit="TB-month",
unit_cost=23,
monthly_volume=50,
monthly_cost=1150
),
CostItem(
category=CostCategory.DATA_PROCESSING,
description="Daily feature pipelines",
unit="DBU-hours",
unit_cost=0.15,
monthly_volume=10000,
monthly_cost=1500
),
CostItem(
category=CostCategory.EXTERNAL_APIS,
description="Embedding API calls",
unit="1M tokens",
unit_cost=0.10,
monthly_volume=5000,
monthly_cost=500
),
CostItem(
category=CostCategory.INFRASTRUCTURE,
description="Monitoring, logging, networking",
unit="flat",
unit_cost=500,
monthly_volume=1,
monthly_cost=500
),
]Total Cost of Ownership
TCO Components and Calculator
Complete implementation for calculating total cost of ownership:
from dataclasses import dataclass
@dataclass
class TCOComponent:
"""Component of Total Cost of Ownership."""
component: str
category: str
description: str
estimation_method: str
often_overlooked: bool
TCO_COMPONENTS = [
TCOComponent(
component="Direct compute costs",
category="Infrastructure",
description="GPU/CPU instances for training and inference",
estimation_method="Cloud billing + reserved instance analysis",
often_overlooked=False
),
TCOComponent(
component="Data storage and transfer",
category="Infrastructure",
description="Storage, egress, cross-region transfer",
estimation_method="Cloud storage billing + transfer monitoring",
often_overlooked=True # Egress costs often surprise teams
),
TCOComponent(
component="Engineering time - development",
category="Personnel",
description="Building, training, deploying models",
estimation_method="Time tracking * fully-loaded cost",
often_overlooked=True
),
TCOComponent(
component="Engineering time - operations",
category="Personnel",
description="Monitoring, debugging, retraining, on-call",
estimation_method="Ops time allocation * fully-loaded cost",
often_overlooked=True
),
TCOComponent(
component="Data labeling and annotation",
category="Data",
description="Human labeling, quality assurance",
estimation_method="Vendor costs + internal QA time",
often_overlooked=False
),
TCOComponent(
component="External API costs",
category="Infrastructure",
description="Third-party ML APIs, embedding services",
estimation_method="API billing analysis",
often_overlooked=False
),
TCOComponent(
component="Opportunity cost of failed experiments",
category="Personnel",
description="Time spent on approaches that didn't work",
estimation_method="Track experiment outcomes and time",
often_overlooked=True
),
TCOComponent(
component="Technical debt servicing",
category="Personnel",
description="Time spent maintaining legacy systems",
estimation_method="Maintenance time tracking",
often_overlooked=True
),
TCOComponent(
component="Compliance and security",
category="Operations",
description="Audits, certifications, security reviews",
estimation_method="Audit costs + engineering time",
often_overlooked=True
),
]
class TCOCalculator:
"""Calculate total cost of ownership for AI systems."""
def __init__(self, engineering_cost_per_hour: float = 150):
self.engineering_cost = engineering_cost_per_hour
def calculate_tco(
self,
direct_costs: dict[str, float],
engineering_hours: dict[str, float],
annual: bool = True
) -> dict:
"""Calculate complete TCO."""
multiplier = 12 if annual else 1
# Direct infrastructure costs
infra_cost = sum(direct_costs.values()) * multiplier
# Engineering costs (fully-loaded)
eng_cost = sum(
hours * self.engineering_cost * multiplier
for hours in engineering_hours.values()
)
# Hidden costs (estimate at 20% of direct)
hidden_costs = (infra_cost + eng_cost) * 0.20
total = infra_cost + eng_cost + hidden_costs
return {
'infrastructure': infra_cost,
'engineering': eng_cost,
'hidden_estimated': hidden_costs,
'total_tco': total,
'period': 'annual' if annual else 'monthly',
'breakdown': {
'infrastructure_pct': infra_cost / total * 100,
'engineering_pct': eng_cost / total * 100,
'hidden_pct': hidden_costs / total * 100
}
}
def compare_build_vs_buy(
self,
build_tco: dict,
buy_cost_monthly: float,
features_parity: float = 0.8
) -> dict:
"""Compare build vs buy options."""
annual_buy = buy_cost_monthly * 12
# Adjust for feature parity
adjusted_buy = annual_buy / features_parity
return {
'build_annual': build_tco['total_tco'],
'buy_annual': annual_buy,
'buy_adjusted': adjusted_buy,
'recommendation': 'build' if build_tco['total_tco'] < adjusted_buy else 'buy',
'savings': abs(build_tco['total_tco'] - adjusted_buy),
'payback_period_months': (
build_tco.get('initial_investment', 0) /
(annual_buy - build_tco['total_tco'] + 1)
) if build_tco['total_tco'] < annual_buy else float('inf')
}Cost Attribution
Attribution Rules and Engine
Complete implementation for cost attribution:
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class AttributionModel(Enum):
DIRECT = "direct" # Attribute to specific team/project
PROPORTIONAL = "proportional" # Split by usage proportion
FIXED_ALLOCATION = "fixed" # Pre-agreed splits
ACTIVITY_BASED = "activity" # Based on specific activities
@dataclass
class CostAttributionRule:
"""Rule for attributing costs."""
cost_category: CostCategory
attribution_model: AttributionModel
allocation_key: str # The metric used for attribution
description: str
ATTRIBUTION_RULES = [
CostAttributionRule(
cost_category=CostCategory.COMPUTE_TRAINING,
attribution_model=AttributionModel.DIRECT,
allocation_key="training_job_owner",
description="Training costs attributed to team that initiated job"
),
CostAttributionRule(
cost_category=CostCategory.COMPUTE_INFERENCE,
attribution_model=AttributionModel.PROPORTIONAL,
allocation_key="request_count_by_product",
description="Inference costs split by product request volume"
),
CostAttributionRule(
cost_category=CostCategory.DATA_STORAGE,
attribution_model=AttributionModel.PROPORTIONAL,
allocation_key="storage_size_by_dataset",
description="Storage costs split by dataset size ownership"
),
CostAttributionRule(
cost_category=CostCategory.EXTERNAL_APIS,
attribution_model=AttributionModel.DIRECT,
allocation_key="api_call_origin",
description="API costs attributed to calling service"
),
]
class CostAttributionEngine:
"""Engine for attributing AI costs to teams and projects."""
def __init__(self, rules: list[CostAttributionRule]):
self.rules = {r.cost_category: r for r in rules}
self.cost_data: list[dict] = []
def record_cost(
self,
category: CostCategory,
amount: float,
metadata: dict
):
"""Record a cost event."""
self.cost_data.append({
'category': category,
'amount': amount,
'metadata': metadata,
'timestamp': datetime.now()
})
def calculate_attribution(
self,
start_date: datetime,
end_date: datetime
) -> dict:
"""Calculate cost attribution for period."""
# Filter to date range
period_costs = [
c for c in self.cost_data
if start_date <= c['timestamp'] <= end_date
]
# Group by category and apply attribution rules
attribution = {}
for cost in period_costs:
category = cost['category']
rule = self.rules.get(category)
if not rule:
continue
if rule.attribution_model == AttributionModel.DIRECT:
owner = cost['metadata'].get(rule.allocation_key, 'unattributed')
if owner not in attribution:
attribution[owner] = {}
if category.value not in attribution[owner]:
attribution[owner][category.value] = 0
attribution[owner][category.value] += cost['amount']
elif rule.attribution_model == AttributionModel.PROPORTIONAL:
# Get proportions from metadata
proportions = cost['metadata'].get('proportions', {})
for owner, proportion in proportions.items():
if owner not in attribution:
attribution[owner] = {}
if category.value not in attribution[owner]:
attribution[owner][category.value] = 0
attribution[owner][category.value] += cost['amount'] * proportion
# Calculate totals
for owner in attribution:
attribution[owner]['total'] = sum(
v for k, v in attribution[owner].items() if k != 'total'
)
return attribution
def generate_chargeback_report(
self,
month: str # YYYY-MM format
) -> dict:
"""Generate monthly chargeback report."""
# Would integrate with finance systems
passUnit Economics
Unit Economics Calculator
Complete implementation for calculating unit economics:
from dataclasses import dataclass
@dataclass
class UnitEconomics:
"""Unit economics for AI service."""
service_name: str
cost_per_request: float
cost_per_user: float
cost_per_dollar_revenue: float
margin: float
class UnitEconomicsCalculator:
"""Calculate unit economics for AI services."""
def __init__(self):
self.metrics: dict[str, dict] = {}
def calculate(
self,
service_name: str,
monthly_costs: float,
monthly_requests: int,
monthly_users: int,
monthly_revenue: float
) -> UnitEconomics:
"""Calculate unit economics."""
cost_per_request = monthly_costs / max(1, monthly_requests)
cost_per_user = monthly_costs / max(1, monthly_users)
cost_per_revenue = monthly_costs / max(1, monthly_revenue)
margin = (monthly_revenue - monthly_costs) / max(1, monthly_revenue)
result = UnitEconomics(
service_name=service_name,
cost_per_request=cost_per_request,
cost_per_user=cost_per_user,
cost_per_dollar_revenue=cost_per_revenue,
margin=margin
)
self.metrics[service_name] = {
'unit_economics': result,
'monthly_costs': monthly_costs,
'monthly_revenue': monthly_revenue
}
return result
def analyze_trends(self, service_name: str, months: int = 6) -> dict:
"""Analyze unit economics trends."""
# Would pull historical data
return {
'service': service_name,
'cost_per_request_trend': 'decreasing',
'margin_trend': 'improving',
'recommendation': 'Continue current optimization path'
}GPU Optimization
GPU Utilization Analyzer
Complete implementation for GPU utilization analysis and optimization:
from dataclasses import dataclass
from typing import Optional
@dataclass
class GPUUtilizationMetrics:
"""GPU utilization metrics."""
gpu_id: str
compute_utilization: float # 0-100%
memory_utilization: float # 0-100%
tensor_core_utilization: float # 0-100% (when available)
time_idle: float # Percentage of time at 0% utilization
class GPUOptimizer:
"""Optimize GPU utilization and costs."""
def __init__(self):
self.utilization_history: list[GPUUtilizationMetrics] = []
def analyze_utilization(
self,
metrics: list[GPUUtilizationMetrics]
) -> dict:
"""Analyze GPU utilization patterns."""
if not metrics:
return {'error': 'No metrics provided'}
avg_compute = sum(m.compute_utilization for m in metrics) / len(metrics)
avg_memory = sum(m.memory_utilization for m in metrics) / len(metrics)
avg_idle = sum(m.time_idle for m in metrics) / len(metrics)
issues = []
recommendations = []
# Check for low compute utilization
if avg_compute < 50:
issues.append(f"Low compute utilization: {avg_compute:.1f}%")
recommendations.append("Consider smaller GPU instance or batch more work")
# Check for memory underutilization
if avg_memory < 40:
issues.append(f"Low memory utilization: {avg_memory:.1f}%")
recommendations.append("Consider smaller GPU or increase batch size")
# Check for idle time
if avg_idle > 20:
issues.append(f"High idle time: {avg_idle:.1f}%")
recommendations.append("Implement GPU sharing or spot instances")
# Check for memory-bound workloads
if avg_memory > 90 and avg_compute < 50:
issues.append("Memory-bound workload")
recommendations.append("Consider GPU with more memory or optimize memory usage")
potential_savings = self._estimate_savings(avg_compute, avg_memory, avg_idle)
return {
'avg_compute_utilization': avg_compute,
'avg_memory_utilization': avg_memory,
'avg_idle_time': avg_idle,
'issues': issues,
'recommendations': recommendations,
'potential_savings_pct': potential_savings
}
def recommend_instance_type(
self,
current_instance: str,
utilization: GPUUtilizationMetrics,
workload_type: str # training, inference, both
) -> dict:
"""Recommend optimal instance type."""
# Instance characteristics (simplified)
instances = {
'g5.xlarge': {'gpu_mem': 24, 'compute': 1, 'cost': 1.0},
'g5.2xlarge': {'gpu_mem': 24, 'compute': 1, 'cost': 1.2},
'p4d.24xlarge': {'gpu_mem': 320, 'compute': 8, 'cost': 32.0},
'p4de.24xlarge': {'gpu_mem': 640, 'compute': 8, 'cost': 40.0},
'g6.xlarge': {'gpu_mem': 24, 'compute': 1, 'cost': 0.9},
}
current = instances.get(current_instance, {})
recommendations = []
# Low utilization - consider smaller
if utilization.compute_utilization < 40:
for name, specs in instances.items():
if specs['compute'] < current.get('compute', 1) and specs['cost'] < current.get('cost', 999):
recommendations.append({
'instance': name,
'reason': 'Lower compute, lower cost',
'estimated_savings': (current['cost'] - specs['cost']) / current['cost'] * 100
})
return {
'current_instance': current_instance,
'current_utilization': utilization.compute_utilization,
'recommendations': recommendations[:3] # Top 3
}
def _estimate_savings(
self,
compute_util: float,
memory_util: float,
idle_time: float
) -> float:
"""Estimate potential savings from optimization."""
# Rough heuristic
utilization_gap = 100 - max(compute_util, memory_util)
idle_savings = idle_time * 0.8 # Can save 80% of idle costs with spot/sharing
return min(50, (utilization_gap * 0.3) + idle_savings)Inference Cost Optimization
Inference Cost Optimizer
Complete implementation for optimizing inference costs:
class InferenceCostOptimizer:
"""Optimize inference costs."""
def __init__(self):
self.optimization_strategies = []
def analyze_inference_costs(
self,
requests_per_second: float,
avg_latency_ms: float,
gpu_cost_per_hour: float,
batch_size: int
) -> dict:
"""Analyze and optimize inference costs."""
current_cost_per_request = (
gpu_cost_per_hour / 3600 * (avg_latency_ms / 1000)
)
optimizations = []
# Batching optimization
if batch_size < 8:
estimated_throughput_increase = min(4, 8 / batch_size)
optimizations.append({
'strategy': 'Increase batch size',
'current_batch': batch_size,
'recommended_batch': min(32, batch_size * 4),
'estimated_savings': f"{(1 - 1/estimated_throughput_increase)*100:.0f}%"
})
# Model optimization
optimizations.append({
'strategy': 'Quantization (INT8)',
'estimated_savings': '30-50%',
'tradeoff': 'Possible 1-2% accuracy loss'
})
# Caching
optimizations.append({
'strategy': 'Response caching',
'estimated_savings': 'Varies with cache hit rate',
'tradeoff': 'Requires cache infrastructure'
})
# Spot instances
optimizations.append({
'strategy': 'Spot/preemptible instances',
'estimated_savings': '60-70%',
'tradeoff': 'Requires handling interruptions'
})
return {
'current_cost_per_request': current_cost_per_request,
'daily_cost': current_cost_per_request * requests_per_second * 86400,
'optimizations': optimizations
}
def calculate_caching_roi(
self,
requests_per_day: int,
cost_per_request: float,
expected_cache_hit_rate: float,
cache_cost_per_day: float
) -> dict:
"""Calculate ROI of implementing caching."""
# Current cost
current_daily_cost = requests_per_day * cost_per_request
# With caching
cache_misses = requests_per_day * (1 - expected_cache_hit_rate)
compute_cost_with_cache = cache_misses * cost_per_request
total_with_cache = compute_cost_with_cache + cache_cost_per_day
savings = current_daily_cost - total_with_cache
roi = (savings / cache_cost_per_day) * 100 if cache_cost_per_day > 0 else float('inf')
return {
'current_daily_cost': current_daily_cost,
'projected_daily_cost': total_with_cache,
'daily_savings': savings,
'monthly_savings': savings * 30,
'roi_percent': roi,
'payback_days': cache_cost_per_day / savings if savings > 0 else float('inf')
}Training Cost Optimization
Training Cost Optimizer
Complete implementation for optimizing training costs:
class TrainingCostOptimizer:
"""Optimize training costs."""
def recommend_training_strategy(
self,
model_size_params: int,
dataset_size_gb: float,
target_performance: float,
budget: float
) -> dict:
"""Recommend training strategy within budget."""
strategies = []
# Full training
full_training_cost = self._estimate_training_cost(
model_size_params,
dataset_size_gb,
epochs=10
)
if full_training_cost <= budget:
strategies.append({
'strategy': 'Full training',
'estimated_cost': full_training_cost,
'expected_performance': target_performance
})
# Fine-tuning
finetune_cost = full_training_cost * 0.1
strategies.append({
'strategy': 'Fine-tune pretrained model',
'estimated_cost': finetune_cost,
'expected_performance': target_performance * 0.95,
'savings_vs_full': f"{(1 - finetune_cost/full_training_cost)*100:.0f}%"
})
# LoRA/PEFT
lora_cost = full_training_cost * 0.05
strategies.append({
'strategy': 'LoRA/PEFT fine-tuning',
'estimated_cost': lora_cost,
'expected_performance': target_performance * 0.92,
'savings_vs_full': f"{(1 - lora_cost/full_training_cost)*100:.0f}%"
})
# Spot instances
spot_cost = full_training_cost * 0.35
strategies.append({
'strategy': 'Full training with spot instances',
'estimated_cost': spot_cost,
'expected_performance': target_performance,
'savings_vs_full': '65%',
'risk': 'Requires checkpointing, may take longer'
})
# Filter to budget
within_budget = [s for s in strategies if s['estimated_cost'] <= budget]
return {
'budget': budget,
'strategies_within_budget': within_budget,
'all_strategies': strategies,
'recommendation': within_budget[0] if within_budget else strategies[-1]
}
def _estimate_training_cost(
self,
model_params: int,
dataset_gb: float,
epochs: int
) -> float:
"""Estimate training cost based on model and data size."""
# Simplified estimation
# Real implementation would use detailed benchmarks
# Assume A100 at $3/hr
gpu_cost_per_hour = 3.0
# Rough estimate: 1B params * 1GB data * 1 epoch ~= 10 GPU-hours
base_hours = (model_params / 1e9) * dataset_gb * epochs * 10
# Multi-GPU scaling (80% efficiency)
gpus_needed = max(1, model_params / 10e9) # Rough memory estimate
total_hours = base_hours / gpus_needed / 0.8
return total_hours * gpu_cost_per_hour * gpus_neededBuild vs Buy Analysis
Build vs Buy Framework
Complete implementation for build vs buy decision analysis:
from dataclasses import dataclass
from typing import Optional
@dataclass
class BuildVsBuyFactor:
"""Factor to consider in build vs buy decision."""
factor: str
weight: float # Importance 0-1
build_score: float # 1-5
buy_score: float # 1-5
notes: str
class BuildVsBuyAnalyzer:
"""Analyze build vs buy decisions for AI capabilities."""
def __init__(self):
self.factors = []
def add_factor(self, factor: BuildVsBuyFactor):
"""Add evaluation factor."""
self.factors.append(factor)
def analyze(self) -> dict:
"""Analyze build vs buy decision."""
build_score = sum(f.weight * f.build_score for f in self.factors)
buy_score = sum(f.weight * f.buy_score for f in self.factors)
total_weight = sum(f.weight for f in self.factors)
normalized_build = build_score / total_weight
normalized_buy = buy_score / total_weight
return {
'build_score': normalized_build,
'buy_score': normalized_buy,
'recommendation': 'build' if normalized_build > normalized_buy else 'buy',
'confidence': abs(normalized_build - normalized_buy) / 5,
'factors': [
{
'factor': f.factor,
'build': f.build_score,
'buy': f.buy_score,
'notes': f.notes
}
for f in self.factors
]
}
# Example: Embedding service build vs buy {.unnumbered}
EMBEDDING_BUILD_VS_BUY = [
BuildVsBuyFactor(
factor="Initial development cost",
weight=0.8,
build_score=2, # High cost
buy_score=5, # Low cost (just API)
notes="Building requires significant engineering investment"
),
BuildVsBuyFactor(
factor="Ongoing operational cost",
weight=1.0,
build_score=4, # Lower at scale
buy_score=2, # Higher at scale
notes="API costs grow linearly; self-hosted amortizes"
),
BuildVsBuyFactor(
factor="Customization ability",
weight=0.6,
build_score=5, # Full control
buy_score=2, # Limited
notes="Custom fine-tuning only possible with build"
),
BuildVsBuyFactor(
factor="Time to production",
weight=0.9,
build_score=1, # Months
buy_score=5, # Days
notes="API can be integrated immediately"
),
BuildVsBuyFactor(
factor="Data privacy",
weight=0.7,
build_score=5, # Data stays internal
buy_score=3, # Data sent to third party
notes="Regulated industries may require build"
),
BuildVsBuyFactor(
factor="Maintenance burden",
weight=0.8,
build_score=2, # Requires ongoing work
buy_score=5, # Vendor handles it
notes="Self-hosted requires ML ops expertise"
),
]
class CrossoverAnalysis:
"""Analyze when build becomes cheaper than buy."""
def calculate_crossover(
self,
build_fixed_cost: float, # One-time development cost
build_variable_cost: float, # Per-unit cost
buy_fixed_cost: float, # Monthly minimum
buy_variable_cost: float, # Per-unit cost
units_per_month: int
) -> dict:
"""Calculate monthly crossover point."""
# Build: fixed + variable * units
# Buy: fixed + variable * units
# Crossover when build_total = buy_total
# Amortize build fixed cost over 24 months
build_monthly_fixed = build_fixed_cost / 24
build_monthly = build_monthly_fixed + (build_variable_cost * units_per_month)
buy_monthly = buy_fixed_cost + (buy_variable_cost * units_per_month)
# Find crossover volume
if build_variable_cost >= buy_variable_cost:
crossover_volume = float('inf') # Build never cheaper
else:
# build_monthly_fixed + build_var * x = buy_fixed + buy_var * x
# x = (build_monthly_fixed - buy_fixed) / (buy_var - build_var)
crossover_volume = (build_monthly_fixed - buy_fixed_cost) / (buy_variable_cost - build_variable_cost)
return {
'current_volume': units_per_month,
'build_monthly_cost': build_monthly,
'buy_monthly_cost': buy_monthly,
'crossover_volume': max(0, crossover_volume),
'recommendation': 'build' if build_monthly < buy_monthly else 'buy',
'monthly_savings': abs(build_monthly - buy_monthly),
'annual_savings': abs(build_monthly - buy_monthly) * 12
}API Cost Management
API Cost Manager and Optimizer
Complete implementation for managing and optimizing API costs:
from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timedelta
@dataclass
class APIUsagePattern:
"""API usage pattern for cost optimization."""
api_name: str
calls_per_day: int
avg_tokens_per_call: int
cost_per_1k_tokens: float
daily_cost: float
optimization_potential: str
class APICostManager:
"""Manage and optimize API costs."""
def __init__(self):
self.usage_log: list[dict] = []
self.budgets: dict[str, float] = {}
self.alerts: list[dict] = []
def log_api_call(
self,
api_name: str,
tokens_used: int,
cost: float,
request_metadata: dict = None
):
"""Log an API call for tracking."""
self.usage_log.append({
'api': api_name,
'tokens': tokens_used,
'cost': cost,
'timestamp': datetime.now(),
'metadata': request_metadata or {}
})
# Check budget
self._check_budget(api_name, cost)
def set_budget(self, api_name: str, daily_budget: float):
"""Set daily budget for an API."""
self.budgets[api_name] = daily_budget
def get_usage_summary(self, api_name: str, days: int = 7) -> dict:
"""Get usage summary for an API."""
cutoff = datetime.now() - timedelta(days=days)
relevant = [
log for log in self.usage_log
if log['api'] == api_name and log['timestamp'] >= cutoff
]
if not relevant:
return {'error': 'No usage data'}
total_cost = sum(log['cost'] for log in relevant)
total_tokens = sum(log['tokens'] for log in relevant)
total_calls = len(relevant)
return {
'api': api_name,
'period_days': days,
'total_calls': total_calls,
'total_tokens': total_tokens,
'total_cost': total_cost,
'avg_cost_per_call': total_cost / total_calls,
'avg_tokens_per_call': total_tokens / total_calls,
'daily_avg_cost': total_cost / days
}
def _check_budget(self, api_name: str, cost: float):
"""Check if budget is being exceeded."""
if api_name not in self.budgets:
return
# Calculate today's spend
today = datetime.now().date()
today_cost = sum(
log['cost'] for log in self.usage_log
if log['api'] == api_name and log['timestamp'].date() == today
)
budget = self.budgets[api_name]
if today_cost > budget * 0.8:
self.alerts.append({
'api': api_name,
'alert': 'Approaching budget limit',
'current_spend': today_cost,
'budget': budget,
'timestamp': datetime.now()
})
class APICostOptimizer:
"""Optimize API costs through various strategies."""
def recommend_optimizations(
self,
api_name: str,
usage_pattern: APIUsagePattern
) -> list[dict]:
"""Recommend cost optimizations for API usage."""
recommendations = []
# Caching
recommendations.append({
'strategy': 'Response caching',
'description': 'Cache identical or similar requests',
'potential_savings': '20-80% depending on query diversity',
'implementation': 'Redis/Memcached with semantic similarity matching',
'tradeoff': 'Stale responses, cache infrastructure cost'
})
# Batching
if usage_pattern.calls_per_day > 1000:
recommendations.append({
'strategy': 'Request batching',
'description': 'Combine multiple requests into single API calls',
'potential_savings': '10-30%',
'implementation': 'Queue requests and batch every N seconds',
'tradeoff': 'Increased latency for individual requests'
})
# Prompt optimization
if usage_pattern.avg_tokens_per_call > 500:
recommendations.append({
'strategy': 'Prompt optimization',
'description': 'Reduce prompt length while maintaining quality',
'potential_savings': f'{(usage_pattern.avg_tokens_per_call - 300) / usage_pattern.avg_tokens_per_call * 100:.0f}%',
'implementation': 'Compress context, use shorter examples',
'tradeoff': 'Potential quality degradation'
})
# Tiered processing
recommendations.append({
'strategy': 'Tiered model selection',
'description': 'Use cheaper models for simple queries',
'potential_savings': '30-60%',
'implementation': 'Classify query complexity, route accordingly',
'tradeoff': 'Quality variation, routing complexity'
})
# Self-hosted alternative
if usage_pattern.daily_cost > 100:
recommendations.append({
'strategy': 'Self-hosted model',
'description': 'Run open-source model internally',
'potential_savings': '40-70% at scale',
'implementation': 'Deploy via vLLM/TGI on GPU instances',
'tradeoff': 'Engineering overhead, model capability differences'
})
return recommendations
def calculate_caching_savings(
self,
daily_calls: int,
cost_per_call: float,
cache_hit_rate: float,
cache_cost_per_day: float
) -> dict:
"""Calculate savings from implementing caching."""
current_daily = daily_calls * cost_per_call
# With caching
calls_after_cache = daily_calls * (1 - cache_hit_rate)
api_cost_with_cache = calls_after_cache * cost_per_call
total_with_cache = api_cost_with_cache + cache_cost_per_day
return {
'current_daily_cost': current_daily,
'projected_daily_cost': total_with_cache,
'daily_savings': current_daily - total_with_cache,
'monthly_savings': (current_daily - total_with_cache) * 30,
'roi': ((current_daily - total_with_cache) / cache_cost_per_day * 100
if cache_cost_per_day > 0 else float('inf')),
'break_even_cache_cost': current_daily * cache_hit_rate
}Token Optimization
Token Optimizer
Complete implementation for optimizing LLM token usage:
class TokenOptimizer:
"""Optimize token usage for LLM APIs."""
def analyze_prompt_efficiency(
self,
prompt: str,
response: str,
model: str
) -> dict:
"""Analyze token efficiency of a prompt."""
# Simplified token estimation (real implementation uses tiktoken)
prompt_tokens = len(prompt.split()) * 1.3
response_tokens = len(response.split()) * 1.3
# Identify optimization opportunities
opportunities = []
# Check for redundant instructions
if prompt.count("Please") > 2 or prompt.count("make sure") > 1:
opportunities.append({
'issue': 'Verbose instructions',
'suggestion': 'Use concise directives',
'estimated_savings': '10-15%'
})
# Check for excessive examples
example_count = prompt.lower().count("example")
if example_count > 3:
opportunities.append({
'issue': f'Many examples ({example_count})',
'suggestion': 'Reduce to 1-2 high-quality examples',
'estimated_savings': f'{(example_count - 2) * 5}%'
})
# Check for repeated context
words = prompt.split()
unique_ratio = len(set(words)) / len(words)
if unique_ratio < 0.5:
opportunities.append({
'issue': 'Repetitive content',
'suggestion': 'Remove redundant context',
'estimated_savings': '15-25%'
})
return {
'prompt_tokens': int(prompt_tokens),
'response_tokens': int(response_tokens),
'total_tokens': int(prompt_tokens + response_tokens),
'opportunities': opportunities,
'potential_savings_pct': sum(
int(o['estimated_savings'].rstrip('%').split('-')[0])
for o in opportunities
)
}
def suggest_compression(self, context: str, max_tokens: int) -> dict:
"""Suggest how to compress context to fit token budget."""
current_tokens = len(context.split()) * 1.3
if current_tokens <= max_tokens:
return {
'compression_needed': False,
'current_tokens': int(current_tokens)
}
compression_ratio = max_tokens / current_tokens
strategies = []
if compression_ratio > 0.7:
strategies.append({
'strategy': 'Remove examples',
'description': 'Keep only the most relevant example'
})
if compression_ratio > 0.4:
strategies.append({
'strategy': 'Summarize context',
'description': 'Replace detailed context with summary'
})
if compression_ratio <= 0.4:
strategies.append({
'strategy': 'RAG approach',
'description': 'Retrieve only relevant portions at query time'
})
return {
'compression_needed': True,
'current_tokens': int(current_tokens),
'target_tokens': max_tokens,
'compression_ratio': compression_ratio,
'strategies': strategies
}Cost Visibility
Cost Dashboard
Complete implementation for cost visibility dashboards:
class CostDashboard:
"""Dashboard for AI cost visibility."""
def __init__(self, cost_data_source):
self.data_source = cost_data_source
def get_team_dashboard(self, team: str) -> dict:
"""Get cost dashboard for team."""
return {
'team': team,
'current_month': {
'total_cost': 45000,
'budget': 50000,
'utilization': 0.90,
'trend': 'stable'
},
'by_category': {
'compute_training': 15000,
'compute_inference': 20000,
'data_storage': 5000,
'external_apis': 5000
},
'top_cost_drivers': [
{'item': 'Weekly model retraining', 'cost': 8000},
{'item': 'Production inference cluster', 'cost': 15000},
{'item': 'Embedding API calls', 'cost': 4000}
],
'optimization_opportunities': [
{
'opportunity': 'Use spot instances for training',
'potential_savings': 4800,
'effort': 'low'
},
{
'opportunity': 'Implement response caching',
'potential_savings': 3000,
'effort': 'medium'
}
],
'month_over_month': {
'change_pct': 5.2,
'main_driver': 'Increased inference traffic'
}
}
def get_anomaly_alerts(self) -> list[dict]:
"""Get cost anomaly alerts."""
return [
{
'alert': 'Training costs 50% above baseline',
'team': 'recommendations',
'action': 'Review training job efficiency'
}
]Cost Governance
Governance Policies and Enforcement
Complete implementation for cost governance:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CostGovernancePolicy:
"""Cost governance policy."""
policy_name: str
description: str
threshold: float
action: str
exception_process: str
COST_POLICIES = [
CostGovernancePolicy(
policy_name="Training job budget",
description="Maximum cost per training job without approval",
threshold=10000,
action="Require manager approval for jobs exceeding threshold",
exception_process="Submit cost justification via form"
),
CostGovernancePolicy(
policy_name="Monthly team budget",
description="Maximum monthly spend per team",
threshold=100000,
action="Alert at 80%, hard stop at 100%",
exception_process="VP approval for budget increase"
),
CostGovernancePolicy(
policy_name="API cost per request",
description="Maximum API cost per user request",
threshold=0.01,
action="Review and optimize high-cost requests",
exception_process="Document business justification"
),
]
class CostGovernanceEnforcer:
"""Enforce cost governance policies."""
def __init__(self, policies: list[CostGovernancePolicy]):
self.policies = {p.policy_name: p for p in policies}
self.violations: list[dict] = []
self.approvals: dict[str, dict] = {}
def check_training_job(
self,
job_id: str,
estimated_cost: float,
requestor: str,
justification: str = None
) -> dict:
"""Check if training job is within budget."""
policy = self.policies.get('Training job budget')
if not policy:
return {'approved': True, 'reason': 'No policy defined'}
if estimated_cost <= policy.threshold:
return {
'approved': True,
'reason': 'Within automatic approval threshold'
}
# Check for pre-approval
approval_key = f"training_{job_id}"
if approval_key in self.approvals:
return {
'approved': True,
'reason': 'Pre-approved',
'approval': self.approvals[approval_key]
}
# Requires approval
self.violations.append({
'policy': 'Training job budget',
'job_id': job_id,
'estimated_cost': estimated_cost,
'threshold': policy.threshold,
'requestor': requestor,
'timestamp': datetime.now()
})
return {
'approved': False,
'reason': f'Exceeds threshold (${estimated_cost:.2f} > ${policy.threshold:.2f})',
'action_required': policy.action,
'exception_process': policy.exception_process
}
def request_budget_exception(
self,
policy_name: str,
requestor: str,
amount: float,
justification: str,
business_impact: str
) -> dict:
"""Request exception to budget policy."""
return {
'request_id': f"EXC-{datetime.now().strftime('%Y%m%d%H%M%S')}",
'policy': policy_name,
'requestor': requestor,
'amount': amount,
'justification': justification,
'business_impact': business_impact,
'status': 'pending',
'required_approvers': self._get_approvers(policy_name, amount)
}
def _get_approvers(self, policy_name: str, amount: float) -> list[str]:
"""Get required approvers based on amount."""
if amount < 50000:
return ['engineering_manager']
elif amount < 100000:
return ['engineering_manager', 'director']
else:
return ['engineering_manager', 'director', 'vp_engineering']
class CostReviewProcess:
"""Process for regular cost reviews."""
@staticmethod
def weekly_review_agenda() -> list[str]:
"""Standard agenda for weekly cost review."""
return [
"1. Review week-over-week cost changes",
"2. Identify top 3 cost drivers",
"3. Review budget utilization by team",
"4. Check for anomalies or unexpected spikes",
"5. Review optimization opportunities pipeline",
"6. Update cost forecasts",
"7. Action items from previous week"
]
@staticmethod
def monthly_review_agenda() -> list[str]:
"""Standard agenda for monthly cost review."""
return [
"1. Month-over-month cost trend analysis",
"2. Budget variance analysis by category",
"3. Unit economics review (cost per prediction, cost per user)",
"4. ROI analysis of major AI initiatives",
"5. Optimization progress report",
"6. Forecast vs actual comparison",
"7. Budget adjustments for next month",
"8. Strategic cost discussions (build vs buy, vendor negotiations)"
]
@staticmethod
def quarterly_review_topics() -> list[str]:
"""Topics for quarterly strategic cost review."""
return [
"Total cost of ownership trends",
"Build vs buy re-evaluation",
"Vendor contract negotiations",
"Infrastructure modernization opportunities",
"Cost benchmarking against industry",
"Budget planning for next quarter"
]Scaling Cost Models
Scaling Cost Projections
Complete implementation for projecting costs at scale:
class ScalingCostModel:
"""Model how costs scale with growth."""
def project_costs(
self,
current_users: int,
current_monthly_cost: float,
cost_structure: dict,
target_users: int
) -> dict:
"""Project costs at different scale."""
scale_factor = target_users / current_users
# Fixed costs don't change
fixed = current_monthly_cost * cost_structure.get('fixed_pct', 0.2)
# Linear costs scale directly
linear = (current_monthly_cost *
cost_structure.get('linear_pct', 0.5) *
scale_factor)
# Sub-linear costs (economies of scale)
sublinear = (current_monthly_cost *
cost_structure.get('sublinear_pct', 0.3) *
(scale_factor ** 0.7))
projected_cost = fixed + linear + sublinear
return {
'current_users': current_users,
'target_users': target_users,
'scale_factor': scale_factor,
'current_cost': current_monthly_cost,
'projected_cost': projected_cost,
'cost_per_user_current': current_monthly_cost / current_users,
'cost_per_user_projected': projected_cost / target_users,
'efficiency_gain': (
(current_monthly_cost / current_users) -
(projected_cost / target_users)
) / (current_monthly_cost / current_users) * 100
}