Cost Engineering Code Reference

Complete code implementations for Chapter 66: Cost Engineering.


Table of Contents


Cost Categories and Breakdown

Cost Item and Breakdown Classes

Complete implementation for modeling AI system costs:

from dataclasses import dataclass
from enum import Enum
from typing import Optional

class CostCategory(Enum):
    COMPUTE_TRAINING = "compute_training"
    COMPUTE_INFERENCE = "compute_inference"
    DATA_STORAGE = "data_storage"
    DATA_PROCESSING = "data_processing"
    EXTERNAL_APIS = "external_apis"
    ENGINEERING_TIME = "engineering_time"
    INFRASTRUCTURE = "infrastructure"
    LICENSING = "licensing"

@dataclass
class CostItem:
    """A cost item in the AI system."""
    category: CostCategory
    description: str
    unit: str
    unit_cost: float
    monthly_volume: float
    monthly_cost: float

@dataclass
class AICostBreakdown:
    """Complete cost breakdown for an AI system."""
    system_name: str
    items: list[CostItem]
    total_monthly: float
    cost_per_prediction: float
    cost_per_user: float

    @classmethod
    def calculate(cls, system_name: str, items: list[CostItem], predictions_per_month: int, users: int):
        """Calculate cost breakdown."""
        total = sum(item.monthly_cost for item in items)
        return cls(
            system_name=system_name,
            items=items,
            total_monthly=total,
            cost_per_prediction=total / max(1, predictions_per_month),
            cost_per_user=total / max(1, users)
        )

# Example cost breakdown for a recommendation system {.unnumbered}
EXAMPLE_RECOMMENDATION_SYSTEM = [
    CostItem(
        category=CostCategory.COMPUTE_TRAINING,
        description="Weekly model retraining on 8x A100",
        unit="GPU-hours/month",
        unit_cost=3.0,
        monthly_volume=160,  # 8 GPUs * 5 hours * 4 weeks
        monthly_cost=480
    ),
    CostItem(
        category=CostCategory.COMPUTE_INFERENCE,
        description="Inference serving on g5.xlarge",
        unit="instance-hours",
        unit_cost=1.0,
        monthly_volume=2160,  # 3 instances * 24 * 30
        monthly_cost=2160
    ),
    CostItem(
        category=CostCategory.DATA_STORAGE,
        description="Feature store and model artifacts",
        unit="TB-month",
        unit_cost=23,
        monthly_volume=50,
        monthly_cost=1150
    ),
    CostItem(
        category=CostCategory.DATA_PROCESSING,
        description="Daily feature pipelines",
        unit="DBU-hours",
        unit_cost=0.15,
        monthly_volume=10000,
        monthly_cost=1500
    ),
    CostItem(
        category=CostCategory.EXTERNAL_APIS,
        description="Embedding API calls",
        unit="1M tokens",
        unit_cost=0.10,
        monthly_volume=5000,
        monthly_cost=500
    ),
    CostItem(
        category=CostCategory.INFRASTRUCTURE,
        description="Monitoring, logging, networking",
        unit="flat",
        unit_cost=500,
        monthly_volume=1,
        monthly_cost=500
    ),
]

Total Cost of Ownership

TCO Components and Calculator

Complete implementation for calculating total cost of ownership:

from dataclasses import dataclass

@dataclass
class TCOComponent:
    """Component of Total Cost of Ownership."""
    component: str
    category: str
    description: str
    estimation_method: str
    often_overlooked: bool

TCO_COMPONENTS = [
    TCOComponent(
        component="Direct compute costs",
        category="Infrastructure",
        description="GPU/CPU instances for training and inference",
        estimation_method="Cloud billing + reserved instance analysis",
        often_overlooked=False
    ),
    TCOComponent(
        component="Data storage and transfer",
        category="Infrastructure",
        description="Storage, egress, cross-region transfer",
        estimation_method="Cloud storage billing + transfer monitoring",
        often_overlooked=True  # Egress costs often surprise teams
    ),
    TCOComponent(
        component="Engineering time - development",
        category="Personnel",
        description="Building, training, deploying models",
        estimation_method="Time tracking * fully-loaded cost",
        often_overlooked=True
    ),
    TCOComponent(
        component="Engineering time - operations",
        category="Personnel",
        description="Monitoring, debugging, retraining, on-call",
        estimation_method="Ops time allocation * fully-loaded cost",
        often_overlooked=True
    ),
    TCOComponent(
        component="Data labeling and annotation",
        category="Data",
        description="Human labeling, quality assurance",
        estimation_method="Vendor costs + internal QA time",
        often_overlooked=False
    ),
    TCOComponent(
        component="External API costs",
        category="Infrastructure",
        description="Third-party ML APIs, embedding services",
        estimation_method="API billing analysis",
        often_overlooked=False
    ),
    TCOComponent(
        component="Opportunity cost of failed experiments",
        category="Personnel",
        description="Time spent on approaches that didn't work",
        estimation_method="Track experiment outcomes and time",
        often_overlooked=True
    ),
    TCOComponent(
        component="Technical debt servicing",
        category="Personnel",
        description="Time spent maintaining legacy systems",
        estimation_method="Maintenance time tracking",
        often_overlooked=True
    ),
    TCOComponent(
        component="Compliance and security",
        category="Operations",
        description="Audits, certifications, security reviews",
        estimation_method="Audit costs + engineering time",
        often_overlooked=True
    ),
]

class TCOCalculator:
    """Calculate total cost of ownership for AI systems."""

    def __init__(self, engineering_cost_per_hour: float = 150):
        self.engineering_cost = engineering_cost_per_hour

    def calculate_tco(
        self,
        direct_costs: dict[str, float],
        engineering_hours: dict[str, float],
        annual: bool = True
    ) -> dict:
        """Calculate complete TCO."""
        multiplier = 12 if annual else 1

        # Direct infrastructure costs
        infra_cost = sum(direct_costs.values()) * multiplier

        # Engineering costs (fully-loaded)
        eng_cost = sum(
            hours * self.engineering_cost * multiplier
            for hours in engineering_hours.values()
        )

        # Hidden costs (estimate at 20% of direct)
        hidden_costs = (infra_cost + eng_cost) * 0.20

        total = infra_cost + eng_cost + hidden_costs

        return {
            'infrastructure': infra_cost,
            'engineering': eng_cost,
            'hidden_estimated': hidden_costs,
            'total_tco': total,
            'period': 'annual' if annual else 'monthly',
            'breakdown': {
                'infrastructure_pct': infra_cost / total * 100,
                'engineering_pct': eng_cost / total * 100,
                'hidden_pct': hidden_costs / total * 100
            }
        }

    def compare_build_vs_buy(
        self,
        build_tco: dict,
        buy_cost_monthly: float,
        features_parity: float = 0.8
    ) -> dict:
        """Compare build vs buy options."""
        annual_buy = buy_cost_monthly * 12

        # Adjust for feature parity
        adjusted_buy = annual_buy / features_parity

        return {
            'build_annual': build_tco['total_tco'],
            'buy_annual': annual_buy,
            'buy_adjusted': adjusted_buy,
            'recommendation': 'build' if build_tco['total_tco'] < adjusted_buy else 'buy',
            'savings': abs(build_tco['total_tco'] - adjusted_buy),
            'payback_period_months': (
                build_tco.get('initial_investment', 0) /
                (annual_buy - build_tco['total_tco'] + 1)
            ) if build_tco['total_tco'] < annual_buy else float('inf')
        }

Cost Attribution

Attribution Rules and Engine

Complete implementation for cost attribution:

from dataclasses import dataclass
from enum import Enum
from datetime import datetime

class AttributionModel(Enum):
    DIRECT = "direct"           # Attribute to specific team/project
    PROPORTIONAL = "proportional"  # Split by usage proportion
    FIXED_ALLOCATION = "fixed"   # Pre-agreed splits
    ACTIVITY_BASED = "activity"  # Based on specific activities

@dataclass
class CostAttributionRule:
    """Rule for attributing costs."""
    cost_category: CostCategory
    attribution_model: AttributionModel
    allocation_key: str  # The metric used for attribution
    description: str

ATTRIBUTION_RULES = [
    CostAttributionRule(
        cost_category=CostCategory.COMPUTE_TRAINING,
        attribution_model=AttributionModel.DIRECT,
        allocation_key="training_job_owner",
        description="Training costs attributed to team that initiated job"
    ),
    CostAttributionRule(
        cost_category=CostCategory.COMPUTE_INFERENCE,
        attribution_model=AttributionModel.PROPORTIONAL,
        allocation_key="request_count_by_product",
        description="Inference costs split by product request volume"
    ),
    CostAttributionRule(
        cost_category=CostCategory.DATA_STORAGE,
        attribution_model=AttributionModel.PROPORTIONAL,
        allocation_key="storage_size_by_dataset",
        description="Storage costs split by dataset size ownership"
    ),
    CostAttributionRule(
        cost_category=CostCategory.EXTERNAL_APIS,
        attribution_model=AttributionModel.DIRECT,
        allocation_key="api_call_origin",
        description="API costs attributed to calling service"
    ),
]

class CostAttributionEngine:
    """Engine for attributing AI costs to teams and projects."""

    def __init__(self, rules: list[CostAttributionRule]):
        self.rules = {r.cost_category: r for r in rules}
        self.cost_data: list[dict] = []

    def record_cost(
        self,
        category: CostCategory,
        amount: float,
        metadata: dict
    ):
        """Record a cost event."""
        self.cost_data.append({
            'category': category,
            'amount': amount,
            'metadata': metadata,
            'timestamp': datetime.now()
        })

    def calculate_attribution(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> dict:
        """Calculate cost attribution for period."""

        # Filter to date range
        period_costs = [
            c for c in self.cost_data
            if start_date <= c['timestamp'] <= end_date
        ]

        # Group by category and apply attribution rules
        attribution = {}

        for cost in period_costs:
            category = cost['category']
            rule = self.rules.get(category)

            if not rule:
                continue

            if rule.attribution_model == AttributionModel.DIRECT:
                owner = cost['metadata'].get(rule.allocation_key, 'unattributed')
                if owner not in attribution:
                    attribution[owner] = {}
                if category.value not in attribution[owner]:
                    attribution[owner][category.value] = 0
                attribution[owner][category.value] += cost['amount']

            elif rule.attribution_model == AttributionModel.PROPORTIONAL:
                # Get proportions from metadata
                proportions = cost['metadata'].get('proportions', {})
                for owner, proportion in proportions.items():
                    if owner not in attribution:
                        attribution[owner] = {}
                    if category.value not in attribution[owner]:
                        attribution[owner][category.value] = 0
                    attribution[owner][category.value] += cost['amount'] * proportion

        # Calculate totals
        for owner in attribution:
            attribution[owner]['total'] = sum(
                v for k, v in attribution[owner].items() if k != 'total'
            )

        return attribution

    def generate_chargeback_report(
        self,
        month: str  # YYYY-MM format
    ) -> dict:
        """Generate monthly chargeback report."""
        # Would integrate with finance systems
        pass

Unit Economics

Unit Economics Calculator

Complete implementation for calculating unit economics:

from dataclasses import dataclass

@dataclass
class UnitEconomics:
    """Unit economics for AI service."""
    service_name: str
    cost_per_request: float
    cost_per_user: float
    cost_per_dollar_revenue: float
    margin: float

class UnitEconomicsCalculator:
    """Calculate unit economics for AI services."""

    def __init__(self):
        self.metrics: dict[str, dict] = {}

    def calculate(
        self,
        service_name: str,
        monthly_costs: float,
        monthly_requests: int,
        monthly_users: int,
        monthly_revenue: float
    ) -> UnitEconomics:
        """Calculate unit economics."""
        cost_per_request = monthly_costs / max(1, monthly_requests)
        cost_per_user = monthly_costs / max(1, monthly_users)
        cost_per_revenue = monthly_costs / max(1, monthly_revenue)
        margin = (monthly_revenue - monthly_costs) / max(1, monthly_revenue)

        result = UnitEconomics(
            service_name=service_name,
            cost_per_request=cost_per_request,
            cost_per_user=cost_per_user,
            cost_per_dollar_revenue=cost_per_revenue,
            margin=margin
        )

        self.metrics[service_name] = {
            'unit_economics': result,
            'monthly_costs': monthly_costs,
            'monthly_revenue': monthly_revenue
        }

        return result

    def analyze_trends(self, service_name: str, months: int = 6) -> dict:
        """Analyze unit economics trends."""
        # Would pull historical data
        return {
            'service': service_name,
            'cost_per_request_trend': 'decreasing',
            'margin_trend': 'improving',
            'recommendation': 'Continue current optimization path'
        }

GPU Optimization

GPU Utilization Analyzer

Complete implementation for GPU utilization analysis and optimization:

from dataclasses import dataclass
from typing import Optional

@dataclass
class GPUUtilizationMetrics:
    """GPU utilization metrics."""
    gpu_id: str
    compute_utilization: float  # 0-100%
    memory_utilization: float   # 0-100%
    tensor_core_utilization: float  # 0-100% (when available)
    time_idle: float  # Percentage of time at 0% utilization

class GPUOptimizer:
    """Optimize GPU utilization and costs."""

    def __init__(self):
        self.utilization_history: list[GPUUtilizationMetrics] = []

    def analyze_utilization(
        self,
        metrics: list[GPUUtilizationMetrics]
    ) -> dict:
        """Analyze GPU utilization patterns."""
        if not metrics:
            return {'error': 'No metrics provided'}

        avg_compute = sum(m.compute_utilization for m in metrics) / len(metrics)
        avg_memory = sum(m.memory_utilization for m in metrics) / len(metrics)
        avg_idle = sum(m.time_idle for m in metrics) / len(metrics)

        issues = []
        recommendations = []

        # Check for low compute utilization
        if avg_compute < 50:
            issues.append(f"Low compute utilization: {avg_compute:.1f}%")
            recommendations.append("Consider smaller GPU instance or batch more work")

        # Check for memory underutilization
        if avg_memory < 40:
            issues.append(f"Low memory utilization: {avg_memory:.1f}%")
            recommendations.append("Consider smaller GPU or increase batch size")

        # Check for idle time
        if avg_idle > 20:
            issues.append(f"High idle time: {avg_idle:.1f}%")
            recommendations.append("Implement GPU sharing or spot instances")

        # Check for memory-bound workloads
        if avg_memory > 90 and avg_compute < 50:
            issues.append("Memory-bound workload")
            recommendations.append("Consider GPU with more memory or optimize memory usage")

        potential_savings = self._estimate_savings(avg_compute, avg_memory, avg_idle)

        return {
            'avg_compute_utilization': avg_compute,
            'avg_memory_utilization': avg_memory,
            'avg_idle_time': avg_idle,
            'issues': issues,
            'recommendations': recommendations,
            'potential_savings_pct': potential_savings
        }

    def recommend_instance_type(
        self,
        current_instance: str,
        utilization: GPUUtilizationMetrics,
        workload_type: str  # training, inference, both
    ) -> dict:
        """Recommend optimal instance type."""

        # Instance characteristics (simplified)
        instances = {
            'g5.xlarge': {'gpu_mem': 24, 'compute': 1, 'cost': 1.0},
            'g5.2xlarge': {'gpu_mem': 24, 'compute': 1, 'cost': 1.2},
            'p4d.24xlarge': {'gpu_mem': 320, 'compute': 8, 'cost': 32.0},
            'p4de.24xlarge': {'gpu_mem': 640, 'compute': 8, 'cost': 40.0},
            'g6.xlarge': {'gpu_mem': 24, 'compute': 1, 'cost': 0.9},
        }

        current = instances.get(current_instance, {})

        recommendations = []

        # Low utilization - consider smaller
        if utilization.compute_utilization < 40:
            for name, specs in instances.items():
                if specs['compute'] < current.get('compute', 1) and specs['cost'] < current.get('cost', 999):
                    recommendations.append({
                        'instance': name,
                        'reason': 'Lower compute, lower cost',
                        'estimated_savings': (current['cost'] - specs['cost']) / current['cost'] * 100
                    })

        return {
            'current_instance': current_instance,
            'current_utilization': utilization.compute_utilization,
            'recommendations': recommendations[:3]  # Top 3
        }

    def _estimate_savings(
        self,
        compute_util: float,
        memory_util: float,
        idle_time: float
    ) -> float:
        """Estimate potential savings from optimization."""
        # Rough heuristic
        utilization_gap = 100 - max(compute_util, memory_util)
        idle_savings = idle_time * 0.8  # Can save 80% of idle costs with spot/sharing

        return min(50, (utilization_gap * 0.3) + idle_savings)

Inference Cost Optimization

Inference Cost Optimizer

Complete implementation for optimizing inference costs:

class InferenceCostOptimizer:
    """Optimize inference costs."""

    def __init__(self):
        self.optimization_strategies = []

    def analyze_inference_costs(
        self,
        requests_per_second: float,
        avg_latency_ms: float,
        gpu_cost_per_hour: float,
        batch_size: int
    ) -> dict:
        """Analyze and optimize inference costs."""

        current_cost_per_request = (
            gpu_cost_per_hour / 3600 * (avg_latency_ms / 1000)
        )

        optimizations = []

        # Batching optimization
        if batch_size < 8:
            estimated_throughput_increase = min(4, 8 / batch_size)
            optimizations.append({
                'strategy': 'Increase batch size',
                'current_batch': batch_size,
                'recommended_batch': min(32, batch_size * 4),
                'estimated_savings': f"{(1 - 1/estimated_throughput_increase)*100:.0f}%"
            })

        # Model optimization
        optimizations.append({
            'strategy': 'Quantization (INT8)',
            'estimated_savings': '30-50%',
            'tradeoff': 'Possible 1-2% accuracy loss'
        })

        # Caching
        optimizations.append({
            'strategy': 'Response caching',
            'estimated_savings': 'Varies with cache hit rate',
            'tradeoff': 'Requires cache infrastructure'
        })

        # Spot instances
        optimizations.append({
            'strategy': 'Spot/preemptible instances',
            'estimated_savings': '60-70%',
            'tradeoff': 'Requires handling interruptions'
        })

        return {
            'current_cost_per_request': current_cost_per_request,
            'daily_cost': current_cost_per_request * requests_per_second * 86400,
            'optimizations': optimizations
        }

    def calculate_caching_roi(
        self,
        requests_per_day: int,
        cost_per_request: float,
        expected_cache_hit_rate: float,
        cache_cost_per_day: float
    ) -> dict:
        """Calculate ROI of implementing caching."""

        # Current cost
        current_daily_cost = requests_per_day * cost_per_request

        # With caching
        cache_misses = requests_per_day * (1 - expected_cache_hit_rate)
        compute_cost_with_cache = cache_misses * cost_per_request
        total_with_cache = compute_cost_with_cache + cache_cost_per_day

        savings = current_daily_cost - total_with_cache
        roi = (savings / cache_cost_per_day) * 100 if cache_cost_per_day > 0 else float('inf')

        return {
            'current_daily_cost': current_daily_cost,
            'projected_daily_cost': total_with_cache,
            'daily_savings': savings,
            'monthly_savings': savings * 30,
            'roi_percent': roi,
            'payback_days': cache_cost_per_day / savings if savings > 0 else float('inf')
        }

Training Cost Optimization

Training Cost Optimizer

Complete implementation for optimizing training costs:

class TrainingCostOptimizer:
    """Optimize training costs."""

    def recommend_training_strategy(
        self,
        model_size_params: int,
        dataset_size_gb: float,
        target_performance: float,
        budget: float
    ) -> dict:
        """Recommend training strategy within budget."""

        strategies = []

        # Full training
        full_training_cost = self._estimate_training_cost(
            model_size_params,
            dataset_size_gb,
            epochs=10
        )

        if full_training_cost <= budget:
            strategies.append({
                'strategy': 'Full training',
                'estimated_cost': full_training_cost,
                'expected_performance': target_performance
            })

        # Fine-tuning
        finetune_cost = full_training_cost * 0.1
        strategies.append({
            'strategy': 'Fine-tune pretrained model',
            'estimated_cost': finetune_cost,
            'expected_performance': target_performance * 0.95,
            'savings_vs_full': f"{(1 - finetune_cost/full_training_cost)*100:.0f}%"
        })

        # LoRA/PEFT
        lora_cost = full_training_cost * 0.05
        strategies.append({
            'strategy': 'LoRA/PEFT fine-tuning',
            'estimated_cost': lora_cost,
            'expected_performance': target_performance * 0.92,
            'savings_vs_full': f"{(1 - lora_cost/full_training_cost)*100:.0f}%"
        })

        # Spot instances
        spot_cost = full_training_cost * 0.35
        strategies.append({
            'strategy': 'Full training with spot instances',
            'estimated_cost': spot_cost,
            'expected_performance': target_performance,
            'savings_vs_full': '65%',
            'risk': 'Requires checkpointing, may take longer'
        })

        # Filter to budget
        within_budget = [s for s in strategies if s['estimated_cost'] <= budget]

        return {
            'budget': budget,
            'strategies_within_budget': within_budget,
            'all_strategies': strategies,
            'recommendation': within_budget[0] if within_budget else strategies[-1]
        }

    def _estimate_training_cost(
        self,
        model_params: int,
        dataset_gb: float,
        epochs: int
    ) -> float:
        """Estimate training cost based on model and data size."""
        # Simplified estimation
        # Real implementation would use detailed benchmarks

        # Assume A100 at $3/hr
        gpu_cost_per_hour = 3.0

        # Rough estimate: 1B params * 1GB data * 1 epoch ~= 10 GPU-hours
        base_hours = (model_params / 1e9) * dataset_gb * epochs * 10

        # Multi-GPU scaling (80% efficiency)
        gpus_needed = max(1, model_params / 10e9)  # Rough memory estimate
        total_hours = base_hours / gpus_needed / 0.8

        return total_hours * gpu_cost_per_hour * gpus_needed

Build vs Buy Analysis

Build vs Buy Framework

Complete implementation for build vs buy decision analysis:

from dataclasses import dataclass
from typing import Optional

@dataclass
class BuildVsBuyFactor:
    """Factor to consider in build vs buy decision."""
    factor: str
    weight: float  # Importance 0-1
    build_score: float  # 1-5
    buy_score: float  # 1-5
    notes: str

class BuildVsBuyAnalyzer:
    """Analyze build vs buy decisions for AI capabilities."""

    def __init__(self):
        self.factors = []

    def add_factor(self, factor: BuildVsBuyFactor):
        """Add evaluation factor."""
        self.factors.append(factor)

    def analyze(self) -> dict:
        """Analyze build vs buy decision."""
        build_score = sum(f.weight * f.build_score for f in self.factors)
        buy_score = sum(f.weight * f.buy_score for f in self.factors)
        total_weight = sum(f.weight for f in self.factors)

        normalized_build = build_score / total_weight
        normalized_buy = buy_score / total_weight

        return {
            'build_score': normalized_build,
            'buy_score': normalized_buy,
            'recommendation': 'build' if normalized_build > normalized_buy else 'buy',
            'confidence': abs(normalized_build - normalized_buy) / 5,
            'factors': [
                {
                    'factor': f.factor,
                    'build': f.build_score,
                    'buy': f.buy_score,
                    'notes': f.notes
                }
                for f in self.factors
            ]
        }

# Example: Embedding service build vs buy {.unnumbered}
EMBEDDING_BUILD_VS_BUY = [
    BuildVsBuyFactor(
        factor="Initial development cost",
        weight=0.8,
        build_score=2,  # High cost
        buy_score=5,    # Low cost (just API)
        notes="Building requires significant engineering investment"
    ),
    BuildVsBuyFactor(
        factor="Ongoing operational cost",
        weight=1.0,
        build_score=4,  # Lower at scale
        buy_score=2,    # Higher at scale
        notes="API costs grow linearly; self-hosted amortizes"
    ),
    BuildVsBuyFactor(
        factor="Customization ability",
        weight=0.6,
        build_score=5,  # Full control
        buy_score=2,    # Limited
        notes="Custom fine-tuning only possible with build"
    ),
    BuildVsBuyFactor(
        factor="Time to production",
        weight=0.9,
        build_score=1,  # Months
        buy_score=5,    # Days
        notes="API can be integrated immediately"
    ),
    BuildVsBuyFactor(
        factor="Data privacy",
        weight=0.7,
        build_score=5,  # Data stays internal
        buy_score=3,    # Data sent to third party
        notes="Regulated industries may require build"
    ),
    BuildVsBuyFactor(
        factor="Maintenance burden",
        weight=0.8,
        build_score=2,  # Requires ongoing work
        buy_score=5,    # Vendor handles it
        notes="Self-hosted requires ML ops expertise"
    ),
]

class CrossoverAnalysis:
    """Analyze when build becomes cheaper than buy."""

    def calculate_crossover(
        self,
        build_fixed_cost: float,    # One-time development cost
        build_variable_cost: float,  # Per-unit cost
        buy_fixed_cost: float,       # Monthly minimum
        buy_variable_cost: float,    # Per-unit cost
        units_per_month: int
    ) -> dict:
        """Calculate monthly crossover point."""

        # Build: fixed + variable * units
        # Buy: fixed + variable * units
        # Crossover when build_total = buy_total

        # Amortize build fixed cost over 24 months
        build_monthly_fixed = build_fixed_cost / 24

        build_monthly = build_monthly_fixed + (build_variable_cost * units_per_month)
        buy_monthly = buy_fixed_cost + (buy_variable_cost * units_per_month)

        # Find crossover volume
        if build_variable_cost >= buy_variable_cost:
            crossover_volume = float('inf')  # Build never cheaper
        else:
            # build_monthly_fixed + build_var * x = buy_fixed + buy_var * x
            # x = (build_monthly_fixed - buy_fixed) / (buy_var - build_var)
            crossover_volume = (build_monthly_fixed - buy_fixed_cost) / (buy_variable_cost - build_variable_cost)

        return {
            'current_volume': units_per_month,
            'build_monthly_cost': build_monthly,
            'buy_monthly_cost': buy_monthly,
            'crossover_volume': max(0, crossover_volume),
            'recommendation': 'build' if build_monthly < buy_monthly else 'buy',
            'monthly_savings': abs(build_monthly - buy_monthly),
            'annual_savings': abs(build_monthly - buy_monthly) * 12
        }

API Cost Management

API Cost Manager and Optimizer

Complete implementation for managing and optimizing API costs:

from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timedelta

@dataclass
class APIUsagePattern:
    """API usage pattern for cost optimization."""
    api_name: str
    calls_per_day: int
    avg_tokens_per_call: int
    cost_per_1k_tokens: float
    daily_cost: float
    optimization_potential: str

class APICostManager:
    """Manage and optimize API costs."""

    def __init__(self):
        self.usage_log: list[dict] = []
        self.budgets: dict[str, float] = {}
        self.alerts: list[dict] = []

    def log_api_call(
        self,
        api_name: str,
        tokens_used: int,
        cost: float,
        request_metadata: dict = None
    ):
        """Log an API call for tracking."""
        self.usage_log.append({
            'api': api_name,
            'tokens': tokens_used,
            'cost': cost,
            'timestamp': datetime.now(),
            'metadata': request_metadata or {}
        })

        # Check budget
        self._check_budget(api_name, cost)

    def set_budget(self, api_name: str, daily_budget: float):
        """Set daily budget for an API."""
        self.budgets[api_name] = daily_budget

    def get_usage_summary(self, api_name: str, days: int = 7) -> dict:
        """Get usage summary for an API."""
        cutoff = datetime.now() - timedelta(days=days)
        relevant = [
            log for log in self.usage_log
            if log['api'] == api_name and log['timestamp'] >= cutoff
        ]

        if not relevant:
            return {'error': 'No usage data'}

        total_cost = sum(log['cost'] for log in relevant)
        total_tokens = sum(log['tokens'] for log in relevant)
        total_calls = len(relevant)

        return {
            'api': api_name,
            'period_days': days,
            'total_calls': total_calls,
            'total_tokens': total_tokens,
            'total_cost': total_cost,
            'avg_cost_per_call': total_cost / total_calls,
            'avg_tokens_per_call': total_tokens / total_calls,
            'daily_avg_cost': total_cost / days
        }

    def _check_budget(self, api_name: str, cost: float):
        """Check if budget is being exceeded."""
        if api_name not in self.budgets:
            return

        # Calculate today's spend
        today = datetime.now().date()
        today_cost = sum(
            log['cost'] for log in self.usage_log
            if log['api'] == api_name and log['timestamp'].date() == today
        )

        budget = self.budgets[api_name]

        if today_cost > budget * 0.8:
            self.alerts.append({
                'api': api_name,
                'alert': 'Approaching budget limit',
                'current_spend': today_cost,
                'budget': budget,
                'timestamp': datetime.now()
            })


class APICostOptimizer:
    """Optimize API costs through various strategies."""

    def recommend_optimizations(
        self,
        api_name: str,
        usage_pattern: APIUsagePattern
    ) -> list[dict]:
        """Recommend cost optimizations for API usage."""
        recommendations = []

        # Caching
        recommendations.append({
            'strategy': 'Response caching',
            'description': 'Cache identical or similar requests',
            'potential_savings': '20-80% depending on query diversity',
            'implementation': 'Redis/Memcached with semantic similarity matching',
            'tradeoff': 'Stale responses, cache infrastructure cost'
        })

        # Batching
        if usage_pattern.calls_per_day > 1000:
            recommendations.append({
                'strategy': 'Request batching',
                'description': 'Combine multiple requests into single API calls',
                'potential_savings': '10-30%',
                'implementation': 'Queue requests and batch every N seconds',
                'tradeoff': 'Increased latency for individual requests'
            })

        # Prompt optimization
        if usage_pattern.avg_tokens_per_call > 500:
            recommendations.append({
                'strategy': 'Prompt optimization',
                'description': 'Reduce prompt length while maintaining quality',
                'potential_savings': f'{(usage_pattern.avg_tokens_per_call - 300) / usage_pattern.avg_tokens_per_call * 100:.0f}%',
                'implementation': 'Compress context, use shorter examples',
                'tradeoff': 'Potential quality degradation'
            })

        # Tiered processing
        recommendations.append({
            'strategy': 'Tiered model selection',
            'description': 'Use cheaper models for simple queries',
            'potential_savings': '30-60%',
            'implementation': 'Classify query complexity, route accordingly',
            'tradeoff': 'Quality variation, routing complexity'
        })

        # Self-hosted alternative
        if usage_pattern.daily_cost > 100:
            recommendations.append({
                'strategy': 'Self-hosted model',
                'description': 'Run open-source model internally',
                'potential_savings': '40-70% at scale',
                'implementation': 'Deploy via vLLM/TGI on GPU instances',
                'tradeoff': 'Engineering overhead, model capability differences'
            })

        return recommendations

    def calculate_caching_savings(
        self,
        daily_calls: int,
        cost_per_call: float,
        cache_hit_rate: float,
        cache_cost_per_day: float
    ) -> dict:
        """Calculate savings from implementing caching."""
        current_daily = daily_calls * cost_per_call

        # With caching
        calls_after_cache = daily_calls * (1 - cache_hit_rate)
        api_cost_with_cache = calls_after_cache * cost_per_call
        total_with_cache = api_cost_with_cache + cache_cost_per_day

        return {
            'current_daily_cost': current_daily,
            'projected_daily_cost': total_with_cache,
            'daily_savings': current_daily - total_with_cache,
            'monthly_savings': (current_daily - total_with_cache) * 30,
            'roi': ((current_daily - total_with_cache) / cache_cost_per_day * 100
                   if cache_cost_per_day > 0 else float('inf')),
            'break_even_cache_cost': current_daily * cache_hit_rate
        }

Token Optimization

Token Optimizer

Complete implementation for optimizing LLM token usage:

class TokenOptimizer:
    """Optimize token usage for LLM APIs."""

    def analyze_prompt_efficiency(
        self,
        prompt: str,
        response: str,
        model: str
    ) -> dict:
        """Analyze token efficiency of a prompt."""
        # Simplified token estimation (real implementation uses tiktoken)
        prompt_tokens = len(prompt.split()) * 1.3
        response_tokens = len(response.split()) * 1.3

        # Identify optimization opportunities
        opportunities = []

        # Check for redundant instructions
        if prompt.count("Please") > 2 or prompt.count("make sure") > 1:
            opportunities.append({
                'issue': 'Verbose instructions',
                'suggestion': 'Use concise directives',
                'estimated_savings': '10-15%'
            })

        # Check for excessive examples
        example_count = prompt.lower().count("example")
        if example_count > 3:
            opportunities.append({
                'issue': f'Many examples ({example_count})',
                'suggestion': 'Reduce to 1-2 high-quality examples',
                'estimated_savings': f'{(example_count - 2) * 5}%'
            })

        # Check for repeated context
        words = prompt.split()
        unique_ratio = len(set(words)) / len(words)
        if unique_ratio < 0.5:
            opportunities.append({
                'issue': 'Repetitive content',
                'suggestion': 'Remove redundant context',
                'estimated_savings': '15-25%'
            })

        return {
            'prompt_tokens': int(prompt_tokens),
            'response_tokens': int(response_tokens),
            'total_tokens': int(prompt_tokens + response_tokens),
            'opportunities': opportunities,
            'potential_savings_pct': sum(
                int(o['estimated_savings'].rstrip('%').split('-')[0])
                for o in opportunities
            )
        }

    def suggest_compression(self, context: str, max_tokens: int) -> dict:
        """Suggest how to compress context to fit token budget."""
        current_tokens = len(context.split()) * 1.3

        if current_tokens <= max_tokens:
            return {
                'compression_needed': False,
                'current_tokens': int(current_tokens)
            }

        compression_ratio = max_tokens / current_tokens

        strategies = []

        if compression_ratio > 0.7:
            strategies.append({
                'strategy': 'Remove examples',
                'description': 'Keep only the most relevant example'
            })

        if compression_ratio > 0.4:
            strategies.append({
                'strategy': 'Summarize context',
                'description': 'Replace detailed context with summary'
            })

        if compression_ratio <= 0.4:
            strategies.append({
                'strategy': 'RAG approach',
                'description': 'Retrieve only relevant portions at query time'
            })

        return {
            'compression_needed': True,
            'current_tokens': int(current_tokens),
            'target_tokens': max_tokens,
            'compression_ratio': compression_ratio,
            'strategies': strategies
        }

Cost Visibility

Cost Dashboard

Complete implementation for cost visibility dashboards:

class CostDashboard:
    """Dashboard for AI cost visibility."""

    def __init__(self, cost_data_source):
        self.data_source = cost_data_source

    def get_team_dashboard(self, team: str) -> dict:
        """Get cost dashboard for team."""
        return {
            'team': team,
            'current_month': {
                'total_cost': 45000,
                'budget': 50000,
                'utilization': 0.90,
                'trend': 'stable'
            },
            'by_category': {
                'compute_training': 15000,
                'compute_inference': 20000,
                'data_storage': 5000,
                'external_apis': 5000
            },
            'top_cost_drivers': [
                {'item': 'Weekly model retraining', 'cost': 8000},
                {'item': 'Production inference cluster', 'cost': 15000},
                {'item': 'Embedding API calls', 'cost': 4000}
            ],
            'optimization_opportunities': [
                {
                    'opportunity': 'Use spot instances for training',
                    'potential_savings': 4800,
                    'effort': 'low'
                },
                {
                    'opportunity': 'Implement response caching',
                    'potential_savings': 3000,
                    'effort': 'medium'
                }
            ],
            'month_over_month': {
                'change_pct': 5.2,
                'main_driver': 'Increased inference traffic'
            }
        }

    def get_anomaly_alerts(self) -> list[dict]:
        """Get cost anomaly alerts."""
        return [
            {
                'alert': 'Training costs 50% above baseline',
                'team': 'recommendations',
                'action': 'Review training job efficiency'
            }
        ]

Cost Governance

Governance Policies and Enforcement

Complete implementation for cost governance:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class CostGovernancePolicy:
    """Cost governance policy."""
    policy_name: str
    description: str
    threshold: float
    action: str
    exception_process: str

COST_POLICIES = [
    CostGovernancePolicy(
        policy_name="Training job budget",
        description="Maximum cost per training job without approval",
        threshold=10000,
        action="Require manager approval for jobs exceeding threshold",
        exception_process="Submit cost justification via form"
    ),
    CostGovernancePolicy(
        policy_name="Monthly team budget",
        description="Maximum monthly spend per team",
        threshold=100000,
        action="Alert at 80%, hard stop at 100%",
        exception_process="VP approval for budget increase"
    ),
    CostGovernancePolicy(
        policy_name="API cost per request",
        description="Maximum API cost per user request",
        threshold=0.01,
        action="Review and optimize high-cost requests",
        exception_process="Document business justification"
    ),
]

class CostGovernanceEnforcer:
    """Enforce cost governance policies."""

    def __init__(self, policies: list[CostGovernancePolicy]):
        self.policies = {p.policy_name: p for p in policies}
        self.violations: list[dict] = []
        self.approvals: dict[str, dict] = {}

    def check_training_job(
        self,
        job_id: str,
        estimated_cost: float,
        requestor: str,
        justification: str = None
    ) -> dict:
        """Check if training job is within budget."""
        policy = self.policies.get('Training job budget')
        if not policy:
            return {'approved': True, 'reason': 'No policy defined'}

        if estimated_cost <= policy.threshold:
            return {
                'approved': True,
                'reason': 'Within automatic approval threshold'
            }

        # Check for pre-approval
        approval_key = f"training_{job_id}"
        if approval_key in self.approvals:
            return {
                'approved': True,
                'reason': 'Pre-approved',
                'approval': self.approvals[approval_key]
            }

        # Requires approval
        self.violations.append({
            'policy': 'Training job budget',
            'job_id': job_id,
            'estimated_cost': estimated_cost,
            'threshold': policy.threshold,
            'requestor': requestor,
            'timestamp': datetime.now()
        })

        return {
            'approved': False,
            'reason': f'Exceeds threshold (${estimated_cost:.2f} > ${policy.threshold:.2f})',
            'action_required': policy.action,
            'exception_process': policy.exception_process
        }

    def request_budget_exception(
        self,
        policy_name: str,
        requestor: str,
        amount: float,
        justification: str,
        business_impact: str
    ) -> dict:
        """Request exception to budget policy."""
        return {
            'request_id': f"EXC-{datetime.now().strftime('%Y%m%d%H%M%S')}",
            'policy': policy_name,
            'requestor': requestor,
            'amount': amount,
            'justification': justification,
            'business_impact': business_impact,
            'status': 'pending',
            'required_approvers': self._get_approvers(policy_name, amount)
        }

    def _get_approvers(self, policy_name: str, amount: float) -> list[str]:
        """Get required approvers based on amount."""
        if amount < 50000:
            return ['engineering_manager']
        elif amount < 100000:
            return ['engineering_manager', 'director']
        else:
            return ['engineering_manager', 'director', 'vp_engineering']


class CostReviewProcess:
    """Process for regular cost reviews."""

    @staticmethod
    def weekly_review_agenda() -> list[str]:
        """Standard agenda for weekly cost review."""
        return [
            "1. Review week-over-week cost changes",
            "2. Identify top 3 cost drivers",
            "3. Review budget utilization by team",
            "4. Check for anomalies or unexpected spikes",
            "5. Review optimization opportunities pipeline",
            "6. Update cost forecasts",
            "7. Action items from previous week"
        ]

    @staticmethod
    def monthly_review_agenda() -> list[str]:
        """Standard agenda for monthly cost review."""
        return [
            "1. Month-over-month cost trend analysis",
            "2. Budget variance analysis by category",
            "3. Unit economics review (cost per prediction, cost per user)",
            "4. ROI analysis of major AI initiatives",
            "5. Optimization progress report",
            "6. Forecast vs actual comparison",
            "7. Budget adjustments for next month",
            "8. Strategic cost discussions (build vs buy, vendor negotiations)"
        ]

    @staticmethod
    def quarterly_review_topics() -> list[str]:
        """Topics for quarterly strategic cost review."""
        return [
            "Total cost of ownership trends",
            "Build vs buy re-evaluation",
            "Vendor contract negotiations",
            "Infrastructure modernization opportunities",
            "Cost benchmarking against industry",
            "Budget planning for next quarter"
        ]

Scaling Cost Models

Scaling Cost Projections

Complete implementation for projecting costs at scale:

class ScalingCostModel:
    """Model how costs scale with growth."""

    def project_costs(
        self,
        current_users: int,
        current_monthly_cost: float,
        cost_structure: dict,
        target_users: int
    ) -> dict:
        """Project costs at different scale."""
        scale_factor = target_users / current_users

        # Fixed costs don't change
        fixed = current_monthly_cost * cost_structure.get('fixed_pct', 0.2)

        # Linear costs scale directly
        linear = (current_monthly_cost *
                  cost_structure.get('linear_pct', 0.5) *
                  scale_factor)

        # Sub-linear costs (economies of scale)
        sublinear = (current_monthly_cost *
                     cost_structure.get('sublinear_pct', 0.3) *
                     (scale_factor ** 0.7))

        projected_cost = fixed + linear + sublinear

        return {
            'current_users': current_users,
            'target_users': target_users,
            'scale_factor': scale_factor,
            'current_cost': current_monthly_cost,
            'projected_cost': projected_cost,
            'cost_per_user_current': current_monthly_cost / current_users,
            'cost_per_user_projected': projected_cost / target_users,
            'efficiency_gain': (
                (current_monthly_cost / current_users) -
                (projected_cost / target_users)
            ) / (current_monthly_cost / current_users) * 100
        }