Reliability Engineering Code Reference

Complete code implementations for Chapter 65: Reliability Engineering.


Table of Contents


Service Level Objectives

AI SLO Framework

Complete implementation for defining AI-specific SLOs:

from dataclasses import dataclass
from enum import Enum
from typing import Optional
from datetime import datetime, timedelta

class SLOCategory(Enum):
    AVAILABILITY = "availability"
    LATENCY = "latency"
    QUALITY = "quality"
    FRESHNESS = "freshness"
    SAFETY = "safety"

@dataclass
class SLO:
    """Service Level Objective definition."""
    name: str
    category: SLOCategory
    description: str
    target: float  # Target value
    threshold: str  # e.g., "99.9%", "< 200ms"
    measurement_window: timedelta
    error_budget: float  # Allowed failures

@dataclass
class SLI:
    """Service Level Indicator (the metric)."""
    name: str
    calculation: str
    data_source: str

class AISLOFramework:
    """Framework for defining AI system SLOs."""

    def __init__(self, service_name: str):
        self.service = service_name
        self.slos: list[SLO] = []
        self.slis: dict[str, SLI] = {}

    def define_availability_slo(
        self,
        target: float = 99.9,
        include_quality_check: bool = True
    ) -> SLO:
        """Define availability SLO for AI service."""
        if include_quality_check:
            description = (
                "Percentage of requests that return a valid prediction "
                "with confidence above minimum threshold"
            )
            calculation = (
                "(successful_predictions - low_confidence_predictions) / "
                "total_requests * 100"
            )
        else:
            description = "Percentage of requests that return without error"
            calculation = "successful_requests / total_requests * 100"

        slo = SLO(
            name=f"{self.service}_availability",
            category=SLOCategory.AVAILABILITY,
            description=description,
            target=target,
            threshold=f">= {target}%",
            measurement_window=timedelta(days=30),
            error_budget=100 - target
        )

        self.slos.append(slo)
        self.slis[slo.name] = SLI(
            name=f"{slo.name}_sli",
            calculation=calculation,
            data_source="request_metrics"
        )

        return slo

    def define_latency_slo(
        self,
        p50_ms: int,
        p99_ms: int,
        include_feature_retrieval: bool = True
    ) -> list[SLO]:
        """Define latency SLOs."""
        slos = []

        component = "end_to_end" if include_feature_retrieval else "inference_only"

        for percentile, target in [("p50", p50_ms), ("p99", p99_ms)]:
            slo = SLO(
                name=f"{self.service}_latency_{percentile}",
                category=SLOCategory.LATENCY,
                description=f"{percentile} latency for {component} request",
                target=target,
                threshold=f"< {target}ms",
                measurement_window=timedelta(days=7),
                error_budget=1.0 if percentile == "p99" else 5.0  # % requests can exceed
            )
            slos.append(slo)
            self.slos.append(slo)

        return slos

    def define_quality_slo(
        self,
        metric_name: str,
        baseline: float,
        allowed_degradation: float = 0.05
    ) -> SLO:
        """Define model quality SLO."""
        target = baseline * (1 - allowed_degradation)

        slo = SLO(
            name=f"{self.service}_quality_{metric_name}",
            category=SLOCategory.QUALITY,
            description=f"Model {metric_name} must remain within {allowed_degradation*100}% of baseline",
            target=target,
            threshold=f">= {target:.3f}",
            measurement_window=timedelta(days=1),
            error_budget=allowed_degradation * 100
        )

        self.slos.append(slo)
        return slo

    def define_freshness_slo(
        self,
        model_max_age_days: int = 7,
        feature_max_age_hours: int = 1
    ) -> list[SLO]:
        """Define freshness SLOs."""
        slos = []

        model_slo = SLO(
            name=f"{self.service}_model_freshness",
            category=SLOCategory.FRESHNESS,
            description="Model age must not exceed threshold",
            target=model_max_age_days,
            threshold=f"< {model_max_age_days} days",
            measurement_window=timedelta(days=1),
            error_budget=0  # No tolerance for stale models
        )

        feature_slo = SLO(
            name=f"{self.service}_feature_freshness",
            category=SLOCategory.FRESHNESS,
            description="Features must not exceed staleness threshold",
            target=feature_max_age_hours,
            threshold=f"< {feature_max_age_hours} hours",
            measurement_window=timedelta(hours=1),
            error_budget=1.0  # 1% of requests can have stale features
        )

        slos = [model_slo, feature_slo]
        self.slos.extend(slos)
        return slos

    def define_safety_slo(
        self,
        max_harmful_rate: float = 0.0001
    ) -> SLO:
        """Define safety SLO for content-generating models."""
        slo = SLO(
            name=f"{self.service}_safety",
            category=SLOCategory.SAFETY,
            description="Rate of outputs flagged as harmful",
            target=max_harmful_rate,
            threshold=f"< {max_harmful_rate*100}%",
            measurement_window=timedelta(hours=1),
            error_budget=0  # Zero tolerance, triggers immediate response
        )

        self.slos.append(slo)
        return slo

Error Budget Tracker

Complete implementation for tracking error budget consumption:

from datetime import datetime, timedelta

class ErrorBudgetTracker:
    """Track error budget consumption."""

    def __init__(self, slos: list[SLO]):
        self.slos = {slo.name: slo for slo in slos}
        self.consumption: dict[str, list[dict]] = {slo.name: [] for slo in slos}

    def record_measurement(
        self,
        slo_name: str,
        actual_value: float,
        timestamp: datetime = None
    ):
        """Record a measurement against an SLO."""
        timestamp = timestamp or datetime.now()
        slo = self.slos[slo_name]

        # Determine if this consumed error budget
        if slo.category == SLOCategory.LATENCY:
            consumed = actual_value > slo.target
        elif slo.category in [SLOCategory.AVAILABILITY, SLOCategory.QUALITY]:
            consumed = actual_value < slo.target
        elif slo.category == SLOCategory.FRESHNESS:
            consumed = actual_value > slo.target
        else:
            consumed = actual_value > slo.target

        self.consumption[slo_name].append({
            'timestamp': timestamp,
            'value': actual_value,
            'target': slo.target,
            'consumed_budget': consumed
        })

    def get_budget_status(self, slo_name: str) -> dict:
        """Get current error budget status."""
        slo = self.slos[slo_name]
        measurements = self.consumption[slo_name]

        # Filter to measurement window
        window_start = datetime.now() - slo.measurement_window
        recent = [m for m in measurements if m['timestamp'] >= window_start]

        if not recent:
            return {'status': 'no_data', 'budget_remaining': slo.error_budget}

        # Calculate consumption
        failures = sum(1 for m in recent if m['consumed_budget'])
        total = len(recent)
        failure_rate = (failures / total) * 100 if total > 0 else 0

        budget_remaining = slo.error_budget - failure_rate

        return {
            'slo': slo_name,
            'error_budget': slo.error_budget,
            'consumed': failure_rate,
            'remaining': max(0, budget_remaining),
            'status': self._budget_status(budget_remaining, slo.error_budget),
            'measurements_in_window': total,
            'failures_in_window': failures
        }

    def _budget_status(self, remaining: float, total: float) -> str:
        """Determine budget status."""
        if remaining <= 0:
            return 'exhausted'
        elif remaining < total * 0.25:
            return 'critical'
        elif remaining < total * 0.5:
            return 'warning'
        else:
            return 'healthy'

Quality Metrics and Real-Time Estimator

Complete implementation for quality metric definitions and real-time estimation:

from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class QualityMetricDefinition:
    """Definition of a quality metric for SLO."""
    name: str
    calculation: Callable
    requires_labels: bool
    real_time_computable: bool
    proxy_metric: Optional[str]  # If labels unavailable in real-time

QUALITY_METRICS = {
    'classification': [
        QualityMetricDefinition(
            name='accuracy',
            calculation=lambda pred, true: (pred == true).mean(),
            requires_labels=True,
            real_time_computable=False,
            proxy_metric='confidence'
        ),
        QualityMetricDefinition(
            name='precision',
            calculation=lambda pred, true: precision_score(true, pred),
            requires_labels=True,
            real_time_computable=False,
            proxy_metric='positive_rate'
        ),
    ],
    'regression': [
        QualityMetricDefinition(
            name='mae',
            calculation=lambda pred, true: abs(pred - true).mean(),
            requires_labels=True,
            real_time_computable=False,
            proxy_metric='prediction_variance'
        ),
    ],
    'ranking': [
        QualityMetricDefinition(
            name='ndcg',
            calculation=lambda pred, true: ndcg_score(true, pred),
            requires_labels=True,
            real_time_computable=False,
            proxy_metric='click_through_rate'  # Can be computed in real-time
        ),
    ],
    'generation': [
        QualityMetricDefinition(
            name='human_approval_rate',
            calculation=lambda outputs: (outputs['rating'] >= 4).mean(),
            requires_labels=True,  # Requires human feedback
            real_time_computable=False,
            proxy_metric='perplexity'
        ),
    ],
}

class RealTimeQualityEstimator:
    """Estimate quality in real-time using proxy metrics."""

    def __init__(self, proxy_baseline: dict):
        self.baseline = proxy_baseline
        self.current_values: dict[str, list] = {}

    def record_proxy(self, metric_name: str, value: float):
        """Record proxy metric value."""
        if metric_name not in self.current_values:
            self.current_values[metric_name] = []
        self.current_values[metric_name].append(value)

        # Keep rolling window
        max_size = 10000
        if len(self.current_values[metric_name]) > max_size:
            self.current_values[metric_name] = \
                self.current_values[metric_name][-max_size:]

    def estimate_quality_status(self, metric_name: str) -> dict:
        """Estimate whether quality SLO is likely being met."""
        if metric_name not in self.current_values:
            return {'status': 'no_data'}

        current = self.current_values[metric_name]
        if len(current) < 100:
            return {'status': 'insufficient_data'}

        baseline = self.baseline.get(metric_name, {})
        baseline_mean = baseline.get('mean', 0)
        baseline_std = baseline.get('std', 1)

        current_mean = sum(current) / len(current)

        # Z-score of drift
        drift = abs(current_mean - baseline_mean) / (baseline_std + 1e-6)

        if drift > 3:
            status = 'likely_violated'
        elif drift > 2:
            status = 'potential_issue'
        else:
            status = 'likely_healthy'

        return {
            'status': status,
            'current_mean': current_mean,
            'baseline_mean': baseline_mean,
            'drift_zscore': drift,
            'sample_size': len(current)
        }

Graceful Degradation

Degradation Controller

Complete implementation for graceful degradation control:

from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Optional
import asyncio

class DegradationLevel(Enum):
    FULL_SERVICE = 0
    CACHED_FEATURES = 1
    SIMPLER_MODEL = 2
    RULE_BASED = 3
    STATIC_FALLBACK = 4

@dataclass
class DegradationStrategy:
    """A degradation strategy definition."""
    level: DegradationLevel
    name: str
    description: str
    handler: Callable
    quality_impact: str
    latency_budget_ms: int

class GracefulDegradationController:
    """Control graceful degradation for AI service."""

    def __init__(self):
        self.strategies: dict[DegradationLevel, DegradationStrategy] = {}
        self.current_level = DegradationLevel.FULL_SERVICE
        self.health_checks: dict[str, Callable] = {}
        self.fallback_data: dict[str, Any] = {}

    def register_strategy(self, strategy: DegradationStrategy):
        """Register a degradation strategy."""
        self.strategies[strategy.level] = strategy

    def register_health_check(self, name: str, check: Callable[[], bool]):
        """Register a health check for a component."""
        self.health_checks[name] = check

    async def execute_with_degradation(
        self,
        request: dict,
        timeout_ms: int = 500
    ) -> dict:
        """Execute request with automatic degradation on failure."""

        # Try each level in order until one succeeds
        for level in DegradationLevel:
            if level not in self.strategies:
                continue

            strategy = self.strategies[level]

            try:
                # Execute with timeout
                result = await asyncio.wait_for(
                    strategy.handler(request),
                    timeout=strategy.latency_budget_ms / 1000
                )

                return {
                    'result': result,
                    'degradation_level': level.name,
                    'quality_impact': strategy.quality_impact
                }

            except asyncio.TimeoutError:
                # Timeout, try next level
                continue
            except Exception as e:
                # Error, try next level
                continue

        # All strategies failed
        return {
            'result': None,
            'error': 'All degradation strategies failed',
            'degradation_level': 'FAILED'
        }

    async def determine_service_level(self) -> DegradationLevel:
        """Determine appropriate service level based on health checks."""

        # Check component health
        health_status = {}
        for name, check in self.health_checks.items():
            try:
                health_status[name] = await asyncio.wait_for(
                    asyncio.to_thread(check),
                    timeout=1.0
                )
            except:
                health_status[name] = False

        # Determine level based on health
        if all(health_status.values()):
            return DegradationLevel.FULL_SERVICE

        if not health_status.get('feature_store', True):
            return DegradationLevel.CACHED_FEATURES

        if not health_status.get('primary_model', True):
            return DegradationLevel.SIMPLER_MODEL

        if not health_status.get('fallback_model', True):
            return DegradationLevel.RULE_BASED

        return DegradationLevel.STATIC_FALLBACK

Circuit Breaker

Complete implementation for circuit breaker pattern:

from datetime import datetime
from typing import Any, Callable

class CircuitBreaker:
    """Circuit breaker for AI model calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests

        self.failures = 0
        self.state = 'closed'  # closed, open, half_open
        self.last_failure_time = None
        self.half_open_successes = 0

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Call function through circuit breaker."""

        if self.state == 'open':
            # Check if recovery timeout has passed
            if self._should_attempt_reset():
                self.state = 'half_open'
                self.half_open_successes = 0
            else:
                raise CircuitOpenError("Circuit is open")

        try:
            result = await func(*args, **kwargs)
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            raise

    def _record_success(self):
        """Record successful call."""
        if self.state == 'half_open':
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_requests:
                self.state = 'closed'
                self.failures = 0
        else:
            self.failures = max(0, self.failures - 1)

    def _record_failure(self):
        """Record failed call."""
        self.failures += 1
        self.last_failure_time = datetime.now()

        if self.failures >= self.failure_threshold:
            self.state = 'open'

    def _should_attempt_reset(self) -> bool:
        """Check if we should try to reset the circuit."""
        if self.last_failure_time is None:
            return True

        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.recovery_timeout


class CircuitOpenError(Exception):
    """Raised when circuit breaker is open."""
    pass

Fallback Executor

Complete implementation for fallback strategy execution:

from dataclasses import dataclass
from typing import Any, Callable

@dataclass
class FallbackStrategy:
    """Fallback strategy for AI system."""
    system_type: str
    primary: str
    fallbacks: list[dict]
    example: str

FALLBACK_STRATEGIES = [
    FallbackStrategy(
        system_type="Recommendation",
        primary="Personalized ML model",
        fallbacks=[
            {"level": 1, "strategy": "Collaborative filtering with cached embeddings"},
            {"level": 2, "strategy": "Popularity-based recommendations"},
            {"level": 3, "strategy": "Category-based defaults"},
            {"level": 4, "strategy": "Static editorial picks"},
        ],
        example="Netflix showing 'Top 10 in Your Country' when personalization fails"
    ),
    FallbackStrategy(
        system_type="Search ranking",
        primary="ML ranking model with full features",
        fallbacks=[
            {"level": 1, "strategy": "Simpler model with fewer features"},
            {"level": 2, "strategy": "BM25 text matching"},
            {"level": 3, "strategy": "Recency-based ordering"},
            {"level": 4, "strategy": "Return unranked results"},
        ],
        example="Google showing results with reduced personalization during outages"
    ),
    FallbackStrategy(
        system_type="Content moderation",
        primary="Large classifier with high accuracy",
        fallbacks=[
            {"level": 1, "strategy": "Smaller, faster classifier"},
            {"level": 2, "strategy": "Keyword-based filtering"},
            {"level": 3, "strategy": "Hold for human review"},
            {"level": 4, "strategy": "Block all uncertain content"},
        ],
        example="Platform blocking content during classifier outage rather than allowing harmful content"
    ),
    FallbackStrategy(
        system_type="LLM chat",
        primary="Large LLM with full context",
        fallbacks=[
            {"level": 1, "strategy": "Same model with reduced context"},
            {"level": 2, "strategy": "Smaller/distilled model"},
            {"level": 3, "strategy": "Template-based responses"},
            {"level": 4, "strategy": "'I'm having trouble right now. Please try again.'"},
        ],
        example="ChatGPT showing capacity message during high load"
    ),
    FallbackStrategy(
        system_type="Fraud detection",
        primary="Real-time ML scoring",
        fallbacks=[
            {"level": 1, "strategy": "Batch-computed risk scores"},
            {"level": 2, "strategy": "Rule-based checks"},
            {"level": 3, "strategy": "Velocity limits only"},
            {"level": 4, "strategy": "Manual review queue"},
        ],
        example="Payment processor adding friction during model degradation"
    ),
]

class FallbackExecutor:
    """Execute appropriate fallback based on system type."""

    def __init__(self, system_type: str):
        self.system_type = system_type
        self.fallback_handlers: dict[int, Callable] = {}
        self.metrics: dict[str, int] = {}

    def register_fallback(self, level: int, handler: Callable):
        """Register fallback handler for level."""
        self.fallback_handlers[level] = handler

    async def execute(
        self,
        request: Any,
        starting_level: int = 0
    ) -> dict:
        """Execute starting from given level."""

        for level in range(starting_level, 5):
            if level not in self.fallback_handlers:
                continue

            try:
                handler = self.fallback_handlers[level]
                result = await handler(request)

                # Record metric
                metric_key = f"fallback_level_{level}"
                self.metrics[metric_key] = self.metrics.get(metric_key, 0) + 1

                return {
                    'result': result,
                    'fallback_level': level,
                    'degraded': level > 0
                }
            except Exception as e:
                continue

        # All fallbacks failed
        self.metrics['all_failed'] = self.metrics.get('all_failed', 0) + 1

        return {
            'result': None,
            'error': 'All fallback levels exhausted',
            'fallback_level': -1
        }

Incident Management

AI Incident Signatures

Complete incident signature definitions for AI-specific incidents:

from dataclasses import dataclass
from enum import Enum

class AIIncidentType(Enum):
    MODEL_DEGRADATION = "model_degradation"
    DATA_QUALITY = "data_quality"
    FEATURE_PIPELINE = "feature_pipeline"
    SERVING_LATENCY = "serving_latency"
    SAFETY_VIOLATION = "safety_violation"
    DRIFT_DETECTED = "drift_detected"
    TRAINING_FAILURE = "training_failure"
    FEEDBACK_LOOP = "feedback_loop"

@dataclass
class AIIncidentSignature:
    """Signature of an AI incident type."""
    type: AIIncidentType
    signals: list[str]
    typical_causes: list[str]
    immediate_actions: list[str]
    investigation_steps: list[str]

AI_INCIDENT_SIGNATURES = [
    AIIncidentSignature(
        type=AIIncidentType.MODEL_DEGRADATION,
        signals=[
            "Drop in online metrics (CTR, conversion, etc.)",
            "Increase in user complaints",
            "A/B test shows model underperforming control",
            "Quality proxy metrics trending down"
        ],
        typical_causes=[
            "Data distribution shift",
            "Feature computation change",
            "Upstream data quality issue",
            "Model deployed from wrong experiment"
        ],
        immediate_actions=[
            "Roll back to previous model version",
            "Increase monitoring granularity",
            "Enable additional logging"
        ],
        investigation_steps=[
            "Compare feature distributions: now vs training",
            "Check recent data pipeline changes",
            "Analyze model behavior by segment",
            "Review recent deployments"
        ]
    ),
    AIIncidentSignature(
        type=AIIncidentType.SAFETY_VIOLATION,
        signals=[
            "Outputs flagged by safety classifier",
            "User reports of harmful content",
            "Monitoring alerts on sensitive outputs"
        ],
        typical_causes=[
            "Adversarial inputs exploiting model",
            "Training data contamination",
            "Edge case not covered by safety filters",
            "Prompt injection attack"
        ],
        immediate_actions=[
            "Block affected input patterns",
            "Increase safety filter strictness",
            "Enable human review for flagged outputs",
            "Consider service degradation to safer fallback"
        ],
        investigation_steps=[
            "Collect and analyze violating examples",
            "Identify common patterns in triggers",
            "Check for training data issues",
            "Review safety filter coverage"
        ]
    ),
    AIIncidentSignature(
        type=AIIncidentType.FEATURE_PIPELINE,
        signals=[
            "Feature freshness SLO violation",
            "Missing features at serving time",
            "Sudden change in feature distributions"
        ],
        typical_causes=[
            "Pipeline job failure",
            "Schema change in upstream data",
            "Resource exhaustion",
            "Dependency service outage"
        ],
        immediate_actions=[
            "Switch to cached/stale features",
            "Enable batch features as fallback",
            "Alert data engineering team"
        ],
        investigation_steps=[
            "Check pipeline job logs",
            "Verify upstream data availability",
            "Review recent schema changes",
            "Check resource utilization"
        ]
    ),
    AIIncidentSignature(
        type=AIIncidentType.DRIFT_DETECTED,
        signals=[
            "PSI/JSD alerts on input features",
            "Prediction distribution shift",
            "Sudden change in model confidence"
        ],
        typical_causes=[
            "Seasonal change in user behavior",
            "Product/UI change affecting features",
            "Real world event (holiday, news)",
            "Data collection bug"
        ],
        immediate_actions=[
            "Verify drift is real (not monitoring bug)",
            "Assess impact on model performance",
            "Consider triggering model retraining"
        ],
        investigation_steps=[
            "Identify which features are drifting",
            "Correlate with external events",
            "Check for upstream data issues",
            "Analyze impact on model outputs"
        ]
    ),
]

AI Incident Manager

Complete implementation for managing AI system incidents:

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum

class IncidentSeverity(Enum):
    SEV1 = 1  # Critical - major user impact
    SEV2 = 2  # High - significant impact
    SEV3 = 3  # Medium - moderate impact
    SEV4 = 4  # Low - minor impact

class IncidentStatus(Enum):
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    MITIGATING = "mitigating"
    RESOLVED = "resolved"
    POST_MORTEM = "post_mortem"

@dataclass
class AIIncident:
    """An AI system incident."""
    id: str
    type: AIIncidentType
    severity: IncidentSeverity
    status: IncidentStatus
    title: str
    description: str
    detected_at: datetime
    detected_by: str  # Alert name or person
    affected_systems: list[str]
    affected_metrics: dict[str, float]  # metric -> impact
    timeline: list[dict] = field(default_factory=list)
    root_cause: Optional[str] = None
    resolution: Optional[str] = None
    resolved_at: Optional[datetime] = None

class AIIncidentManager:
    """Manage AI system incidents."""

    def __init__(self):
        self.active_incidents: dict[str, AIIncident] = {}
        self.incident_history: list[AIIncident] = []
        self.on_call_rotation: dict[str, str] = {}

    def create_incident(
        self,
        type: AIIncidentType,
        severity: IncidentSeverity,
        title: str,
        description: str,
        detected_by: str,
        affected_systems: list[str]
    ) -> AIIncident:
        """Create new incident."""
        incident_id = f"AI-{datetime.now().strftime('%Y%m%d')}-{len(self.incident_history)+1:04d}"

        incident = AIIncident(
            id=incident_id,
            type=type,
            severity=severity,
            status=IncidentStatus.DETECTED,
            title=title,
            description=description,
            detected_at=datetime.now(),
            detected_by=detected_by,
            affected_systems=affected_systems,
            affected_metrics={},
            timeline=[{
                'timestamp': datetime.now(),
                'status': 'detected',
                'description': f'Incident detected by {detected_by}'
            }]
        )

        self.active_incidents[incident_id] = incident

        # Page on-call if high severity
        if severity in [IncidentSeverity.SEV1, IncidentSeverity.SEV2]:
            self._page_on_call(incident)

        return incident

    def update_status(
        self,
        incident_id: str,
        new_status: IncidentStatus,
        description: str
    ):
        """Update incident status."""
        incident = self.active_incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident {incident_id} not found")

        incident.status = new_status
        incident.timeline.append({
            'timestamp': datetime.now(),
            'status': new_status.value,
            'description': description
        })

        if new_status == IncidentStatus.RESOLVED:
            incident.resolved_at = datetime.now()
            self._move_to_history(incident_id)

    def get_runbook(self, incident_type: AIIncidentType) -> dict:
        """Get incident response runbook."""
        signature = next(
            (s for s in AI_INCIDENT_SIGNATURES if s.type == incident_type),
            None
        )

        if not signature:
            return {'error': 'No runbook for incident type'}

        return {
            'type': incident_type.value,
            'signals': signature.signals,
            'immediate_actions': signature.immediate_actions,
            'investigation_steps': signature.investigation_steps,
            'typical_causes': signature.typical_causes,
            'escalation_path': self._get_escalation_path(incident_type)
        }

    def _page_on_call(self, incident: AIIncident):
        """Page on-call engineer."""
        # Would integrate with PagerDuty, OpsGenie, etc.
        pass

    def _move_to_history(self, incident_id: str):
        """Move resolved incident to history."""
        incident = self.active_incidents.pop(incident_id, None)
        if incident:
            self.incident_history.append(incident)

    def _get_escalation_path(self, incident_type: AIIncidentType) -> list[str]:
        """Get escalation path for incident type."""
        paths = {
            AIIncidentType.MODEL_DEGRADATION: [
                "ML Engineer On-Call",
                "ML Tech Lead",
                "ML Engineering Manager"
            ],
            AIIncidentType.SAFETY_VIOLATION: [
                "ML Engineer On-Call",
                "Trust & Safety Team",
                "VP of Engineering"
            ],
            AIIncidentType.FEATURE_PIPELINE: [
                "Data Engineer On-Call",
                "ML Engineer On-Call",
                "Data Engineering Manager"
            ],
        }
        return paths.get(incident_type, ["Engineering On-Call"])

Post-Incident Review

Complete implementation for post-incident review:

from dataclasses import dataclass

@dataclass
class PostIncidentReview:
    """Post-incident review (blameless post-mortem)."""
    incident_id: str
    incident_summary: str
    timeline: list[dict]
    root_cause_analysis: str
    contributing_factors: list[str]
    what_went_well: list[str]
    what_could_improve: list[str]
    action_items: list[dict]
    prevention_measures: list[str]
    detection_improvements: list[str]

class PostIncidentReviewTemplate:
    """Generate post-incident review template."""

    @staticmethod
    def generate(incident: AIIncident) -> PostIncidentReview:
        """Generate review template from incident."""

        # Standard questions for AI incidents
        questions = {
            AIIncidentType.MODEL_DEGRADATION: {
                'root_cause': [
                    "What changed in the data distribution?",
                    "Were there upstream data changes?",
                    "Was the correct model version deployed?",
                    "Were features computed correctly?"
                ],
                'prevention': [
                    "Add canary deployment for models",
                    "Improve monitoring coverage",
                    "Add data validation gates",
                    "Implement automatic rollback"
                ]
            },
            AIIncidentType.SAFETY_VIOLATION: {
                'root_cause': [
                    "How did harmful content bypass filters?",
                    "Was this a known edge case?",
                    "Was training data contaminated?",
                    "Were safety classifiers up to date?"
                ],
                'prevention': [
                    "Expand safety test coverage",
                    "Add adversarial testing to CI",
                    "Improve real-time safety filtering",
                    "Establish red team process"
                ]
            },
        }

        incident_questions = questions.get(incident.type, {})

        return PostIncidentReview(
            incident_id=incident.id,
            incident_summary=f"{incident.title}: {incident.description}",
            timeline=incident.timeline,
            root_cause_analysis=incident.root_cause or "To be determined",
            contributing_factors=[],  # To be filled
            what_went_well=[],  # To be filled
            what_could_improve=[],  # To be filled
            action_items=[],  # To be filled
            prevention_measures=incident_questions.get('prevention', []),
            detection_improvements=[]  # To be filled
        )

Incident Metrics

Complete implementation for tracking incident metrics:

from datetime import datetime, timedelta

class IncidentMetrics:
    """Track incident metrics for SRE reporting."""

    def __init__(self):
        self.incidents: list[AIIncident] = []

    def add_incident(self, incident: AIIncident):
        """Add incident to metrics."""
        self.incidents.append(incident)

    def calculate_metrics(self, window_days: int = 30) -> dict:
        """Calculate incident metrics."""
        cutoff = datetime.now() - timedelta(days=window_days)
        recent = [i for i in self.incidents if i.detected_at >= cutoff]

        if not recent:
            return {'total_incidents': 0}

        # MTTR (Mean Time to Recovery)
        resolution_times = []
        for i in recent:
            if i.resolved_at:
                delta = (i.resolved_at - i.detected_at).total_seconds() / 3600
                resolution_times.append(delta)

        mttr_hours = (
            sum(resolution_times) / len(resolution_times)
            if resolution_times else None
        )

        # Incidents by type
        by_type = {}
        for i in recent:
            by_type[i.type.value] = by_type.get(i.type.value, 0) + 1

        # Incidents by severity
        by_severity = {}
        for i in recent:
            by_severity[i.severity.name] = by_severity.get(i.severity.name, 0) + 1

        return {
            'total_incidents': len(recent),
            'mttr_hours': mttr_hours,
            'by_type': by_type,
            'by_severity': by_severity,
            'window_days': window_days
        }

Reliable Pipelines

Reliable Training Pipeline

Complete implementation for training pipeline with reliability features:

from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class TrainingPipelineConfig:
    """Configuration for reliable training pipeline."""
    checkpoint_frequency: int  # Steps between checkpoints
    max_retries: int
    timeout_hours: int
    resource_limits: dict
    validation_gates: list[Callable]
    rollback_on_failure: bool

class ReliableTrainingPipeline:
    """Training pipeline with reliability features."""

    def __init__(self, config: TrainingPipelineConfig):
        self.config = config
        self.checkpoints: list[str] = []
        self.current_step = 0

    async def train(
        self,
        model,
        train_data,
        val_data
    ) -> dict:
        """Run training with reliability guarantees."""

        attempt = 0
        last_checkpoint = None

        while attempt < self.config.max_retries:
            try:
                # Resume from checkpoint if available
                if last_checkpoint:
                    model.load_checkpoint(last_checkpoint)

                # Training loop with checkpointing
                for step, batch in enumerate(train_data):
                    self.current_step = step

                    # Train step
                    loss = model.train_step(batch)

                    # Checkpoint periodically
                    if step % self.config.checkpoint_frequency == 0:
                        checkpoint_path = self._save_checkpoint(model, step)
                        last_checkpoint = checkpoint_path
                        self.checkpoints.append(checkpoint_path)

                    # Validation gate
                    if step % (self.config.checkpoint_frequency * 10) == 0:
                        if not self._validate(model, val_data):
                            raise TrainingValidationError(
                                f"Validation failed at step {step}"
                            )

                # Final validation
                final_metrics = self._final_validation(model, val_data)

                return {
                    'status': 'success',
                    'final_metrics': final_metrics,
                    'checkpoints': self.checkpoints,
                    'attempts': attempt + 1
                }

            except Exception as e:
                attempt += 1
                if attempt >= self.config.max_retries:
                    if self.config.rollback_on_failure and last_checkpoint:
                        return {
                            'status': 'failed_with_rollback',
                            'last_checkpoint': last_checkpoint,
                            'error': str(e)
                        }
                    raise

    def _save_checkpoint(self, model, step: int) -> str:
        """Save training checkpoint."""
        path = f"checkpoints/model_step_{step}.pt"
        model.save(path)
        return path

    def _validate(self, model, val_data) -> bool:
        """Run validation gates."""
        for gate in self.config.validation_gates:
            if not gate(model, val_data):
                return False
        return True

    def _final_validation(self, model, val_data) -> dict:
        """Run final validation."""
        return model.evaluate(val_data)


class TrainingValidationError(Exception):
    """Raised when training validation fails."""
    pass

Load Shedding Strategy

Complete implementation for load shedding:

from collections import defaultdict
import time

class LoadSheddingStrategy:
    """Load shedding strategies for AI systems."""

    def __init__(
        self,
        max_qps: int,
        priority_levels: int = 4
    ):
        self.max_qps = max_qps
        self.priority_levels = priority_levels
        self.current_qps = 0
        self.request_counts: dict[int, int] = defaultdict(int)
        self.last_reset = time.time()

    def should_accept(
        self,
        request: dict,
        priority: int = 2
    ) -> dict:
        """Decide whether to accept or shed request."""

        # Reset counter if new second
        now = time.time()
        if now - self.last_reset >= 1.0:
            self.current_qps = 0
            self.request_counts = defaultdict(int)
            self.last_reset = now

        # Calculate load factor
        load_factor = self.current_qps / self.max_qps

        # Priority-based shedding
        # Priority 0 = highest (critical), Priority 3 = lowest (nice-to-have)
        if load_factor >= 0.95:
            # Extreme load: only accept priority 0
            accept = priority == 0
            reason = "extreme_load"
        elif load_factor >= 0.85:
            # High load: accept priority 0-1
            accept = priority <= 1
            reason = "high_load"
        elif load_factor >= 0.70:
            # Moderate load: accept priority 0-2
            accept = priority <= 2
            reason = "moderate_load"
        else:
            # Normal load: accept all
            accept = True
            reason = "normal_load"

        if accept:
            self.current_qps += 1
            self.request_counts[priority] += 1

        return {
            'accept': accept,
            'reason': reason,
            'load_factor': load_factor,
            'priority': priority,
            'current_qps': self.current_qps
        }

    def get_request_priority(self, request: dict) -> int:
        """Determine request priority."""

        # Critical: Safety-related, high-value transactions
        if request.get('is_safety_critical'):
            return 0

        # High: Logged-in users, premium customers
        if request.get('user_tier') == 'premium':
            return 1

        # Normal: Standard requests
        if request.get('user_id'):
            return 2

        # Low: Anonymous, background tasks
        return 3

Adaptive Rate Limiter

Complete implementation for adaptive rate limiting:

class AdaptiveRateLimiter:
    """Rate limiter that adapts to model latency."""

    def __init__(
        self,
        base_limit: int,
        latency_threshold_ms: int = 100
    ):
        self.base_limit = base_limit
        self.latency_threshold = latency_threshold_ms
        self.current_limit = base_limit
        self.latency_samples: list[float] = []

    def record_latency(self, latency_ms: float):
        """Record request latency."""
        self.latency_samples.append(latency_ms)

        # Keep rolling window
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]

        # Adapt limit based on latency
        self._adapt_limit()

    def _adapt_limit(self):
        """Adapt rate limit based on observed latency."""
        if len(self.latency_samples) < 100:
            return

        # Calculate p95 latency
        sorted_samples = sorted(self.latency_samples)
        p95_idx = int(len(sorted_samples) * 0.95)
        p95_latency = sorted_samples[p95_idx]

        # Adjust limit
        if p95_latency > self.latency_threshold * 2:
            # Severely overloaded: reduce limit significantly
            self.current_limit = max(
                self.base_limit * 0.5,
                self.current_limit * 0.8
            )
        elif p95_latency > self.latency_threshold:
            # Overloaded: reduce limit slightly
            self.current_limit = max(
                self.base_limit * 0.7,
                self.current_limit * 0.95
            )
        elif p95_latency < self.latency_threshold * 0.5:
            # Underloaded: increase limit
            self.current_limit = min(
                self.base_limit * 1.5,
                self.current_limit * 1.05
            )

    def get_current_limit(self) -> int:
        """Get current rate limit."""
        return int(self.current_limit)

Reliable Serving Pipeline

Complete implementation for serving pipeline with reliability features:

class ReliableServingPipeline:
    """Serving pipeline with reliability features."""

    def __init__(
        self,
        primary_model,
        fallback_model,
        feature_store,
        circuit_breaker: CircuitBreaker,
        degradation_controller: GracefulDegradationController
    ):
        self.primary = primary_model
        self.fallback = fallback_model
        self.features = feature_store
        self.circuit_breaker = circuit_breaker
        self.degradation = degradation_controller

        # Metrics
        self.request_count = 0
        self.fallback_count = 0
        self.error_count = 0

    async def predict(self, request: dict) -> dict:
        """Make prediction with reliability guarantees."""
        self.request_count += 1

        try:
            # Get features
            features = await self._get_features_with_fallback(request)

            # Predict with circuit breaker
            try:
                prediction = await self.circuit_breaker.call(
                    self.primary.predict,
                    features
                )
                return {
                    'prediction': prediction,
                    'model': 'primary',
                    'degraded': False
                }
            except CircuitOpenError:
                # Primary circuit open, use fallback
                self.fallback_count += 1
                prediction = await self.fallback.predict(features)
                return {
                    'prediction': prediction,
                    'model': 'fallback',
                    'degraded': True
                }

        except Exception as e:
            self.error_count += 1

            # Use degradation controller
            return await self.degradation.execute_with_degradation(request)

    async def _get_features_with_fallback(self, request: dict) -> dict:
        """Get features with fallback to cached values."""
        try:
            return await self.features.get_online_features(
                feature_refs=request.get('feature_refs', []),
                entity_keys=request.get('entity_keys', {})
            )
        except Exception:
            # Fall back to cached/batch features
            return await self.features.get_cached_features(
                entity_keys=request.get('entity_keys', {})
            )

    def get_reliability_metrics(self) -> dict:
        """Get reliability metrics."""
        return {
            'total_requests': self.request_count,
            'fallback_requests': self.fallback_count,
            'error_requests': self.error_count,
            'fallback_rate': self.fallback_count / max(1, self.request_count),
            'error_rate': self.error_count / max(1, self.request_count),
            'circuit_state': self.circuit_breaker.state
        }

Observability

AI Observability Pipeline

Complete implementation for AI system observability:

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
import json

@dataclass
class AIObservabilityEvent:
    """An observability event for AI systems."""
    timestamp: datetime
    event_type: str  # request, prediction, feature, model, error
    service: str
    model_version: str
    request_id: str
    latency_ms: float
    features: Optional[dict]
    prediction: Optional[Any]
    confidence: Optional[float]
    metadata: dict

class AIObservabilityPipeline:
    """Observability pipeline for AI systems."""

    def __init__(self, metrics_backend, logging_backend, tracing_backend):
        self.metrics = metrics_backend
        self.logs = logging_backend
        self.traces = tracing_backend
        self.prediction_samples: list[dict] = []

    def record_prediction(
        self,
        request_id: str,
        model_version: str,
        features: dict,
        prediction: Any,
        confidence: float,
        latency_ms: float,
        metadata: dict = None
    ):
        """Record a prediction for observability."""

        # Metrics
        self.metrics.increment('predictions_total', tags={
            'model_version': model_version
        })
        self.metrics.histogram('prediction_latency_ms', latency_ms)
        self.metrics.histogram('prediction_confidence', confidence)

        # Logging (sample to avoid overwhelming)
        if self._should_sample_log():
            self.logs.info(
                "prediction",
                request_id=request_id,
                model_version=model_version,
                prediction=str(prediction)[:100],  # Truncate
                confidence=confidence,
                latency_ms=latency_ms
            )

        # Trace span
        self.traces.add_span_data({
            'prediction': str(prediction)[:100],
            'confidence': confidence,
            'model_version': model_version
        })

        # Sample predictions for analysis
        self._sample_prediction({
            'request_id': request_id,
            'features': features,
            'prediction': prediction,
            'confidence': confidence,
            'timestamp': datetime.now()
        })

    def record_feature_retrieval(
        self,
        request_id: str,
        feature_names: list[str],
        latency_ms: float,
        cache_hit: bool,
        stale_features: list[str]
    ):
        """Record feature retrieval metrics."""
        self.metrics.histogram('feature_retrieval_latency_ms', latency_ms)
        self.metrics.increment(
            'feature_cache_hits' if cache_hit else 'feature_cache_misses'
        )

        if stale_features:
            self.metrics.increment(
                'stale_features',
                value=len(stale_features)
            )
            self.logs.warning(
                "stale_features",
                request_id=request_id,
                stale_features=stale_features
            )

    def record_degradation(
        self,
        request_id: str,
        original_level: int,
        actual_level: int,
        reason: str
    ):
        """Record when service degradation occurs."""
        self.metrics.increment('degradation_events', tags={
            'from_level': str(original_level),
            'to_level': str(actual_level),
            'reason': reason
        })

        self.logs.warning(
            "service_degradation",
            request_id=request_id,
            from_level=original_level,
            to_level=actual_level,
            reason=reason
        )

    def _should_sample_log(self) -> bool:
        """Determine if this request should be logged."""
        import random
        return random.random() < 0.01  # 1% sampling

    def _sample_prediction(self, prediction_data: dict):
        """Sample predictions for offline analysis."""
        self.prediction_samples.append(prediction_data)

        # Keep bounded
        max_samples = 10000
        if len(self.prediction_samples) > max_samples:
            self.prediction_samples = self.prediction_samples[-max_samples:]

Distribution Monitor

Complete implementation for monitoring feature and prediction distributions:

class DistributionMonitor:
    """Monitor feature and prediction distributions."""

    def __init__(self):
        self.baselines: dict[str, dict] = {}
        self.current: dict[str, list] = {}

    def set_baseline(self, name: str, values: list[float]):
        """Set baseline distribution."""
        self.baselines[name] = {
            'mean': sum(values) / len(values),
            'std': self._std(values),
            'min': min(values),
            'max': max(values),
            'count': len(values)
        }

    def record_value(self, name: str, value: float):
        """Record a value for monitoring."""
        if name not in self.current:
            self.current[name] = []
        self.current[name].append(value)

        # Keep bounded
        if len(self.current[name]) > 10000:
            self.current[name] = self.current[name][-10000:]

    def check_distribution(self, name: str) -> dict:
        """Check current distribution against baseline."""
        if name not in self.baselines or name not in self.current:
            return {'status': 'no_data'}

        baseline = self.baselines[name]
        current = self.current[name]

        if len(current) < 100:
            return {'status': 'insufficient_data'}

        current_mean = sum(current) / len(current)
        current_std = self._std(current)

        # Drift metrics
        mean_drift = (
            abs(current_mean - baseline['mean']) /
            (baseline['std'] + 1e-6)
        )

        return {
            'status': 'ok' if mean_drift < 2 else 'drift_detected',
            'baseline_mean': baseline['mean'],
            'current_mean': current_mean,
            'mean_drift_zscore': mean_drift,
            'alert': mean_drift >= 3
        }

    def _std(self, values: list) -> float:
        if not values:
            return 0
        mean = sum(values) / len(values)
        return (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5

Chaos Engineering

Chaos Experiments

Chaos experiment definitions for AI systems:

from dataclasses import dataclass
from enum import Enum

class ChaosExperimentType(Enum):
    MODEL_FAILURE = "model_failure"
    FEATURE_STORE_LATENCY = "feature_store_latency"
    STALE_FEATURES = "stale_features"
    MODEL_LATENCY = "model_latency"
    DATA_QUALITY = "data_quality"
    CAPACITY_OVERLOAD = "capacity_overload"

@dataclass
class ChaosExperiment:
    """A chaos experiment definition."""
    name: str
    type: ChaosExperimentType
    description: str
    blast_radius: str  # limited, moderate, full
    expected_behavior: str
    rollback_procedure: str
    duration_minutes: int

CHAOS_EXPERIMENTS = [
    ChaosExperiment(
        name="feature_store_latency_spike",
        type=ChaosExperimentType.FEATURE_STORE_LATENCY,
        description="Add 500ms latency to feature store calls",
        blast_radius="limited",
        expected_behavior="System should fall back to cached features or simpler model",
        rollback_procedure="Remove latency injection",
        duration_minutes=15
    ),
    ChaosExperiment(
        name="primary_model_failure",
        type=ChaosExperimentType.MODEL_FAILURE,
        description="Return errors from primary model",
        blast_radius="moderate",
        expected_behavior="System should fall back to secondary model",
        rollback_procedure="Restore primary model",
        duration_minutes=10
    ),
    ChaosExperiment(
        name="stale_feature_injection",
        type=ChaosExperimentType.STALE_FEATURES,
        description="Serve features with old timestamps",
        blast_radius="limited",
        expected_behavior="System should detect staleness and alert",
        rollback_procedure="Restore normal feature timestamps",
        duration_minutes=20
    ),
    ChaosExperiment(
        name="capacity_overload",
        type=ChaosExperimentType.CAPACITY_OVERLOAD,
        description="Send 2x normal traffic",
        blast_radius="moderate",
        expected_behavior="System should gracefully shed load and maintain SLOs for priority traffic",
        rollback_procedure="Return to normal traffic levels",
        duration_minutes=15
    ),
]

Chaos Runner

Complete implementation for running chaos experiments:

from datetime import datetime
from typing import Optional

class ChaosRunner:
    """Run chaos experiments on AI systems."""

    def __init__(self, target_service):
        self.target = target_service
        self.active_experiments: dict[str, ChaosExperiment] = {}
        self.results: list[dict] = []

    def start_experiment(
        self,
        experiment: ChaosExperiment,
        parameters: dict = None
    ) -> dict:
        """Start a chaos experiment."""

        # Safety checks
        if not self._is_safe_to_run(experiment):
            return {
                'status': 'blocked',
                'reason': 'Safety checks failed'
            }

        # Start experiment
        self.active_experiments[experiment.name] = experiment

        # Apply chaos
        if experiment.type == ChaosExperimentType.FEATURE_STORE_LATENCY:
            self.target.inject_feature_latency(
                latency_ms=parameters.get('latency_ms', 500)
            )
        elif experiment.type == ChaosExperimentType.MODEL_FAILURE:
            self.target.inject_model_errors(
                error_rate=parameters.get('error_rate', 1.0)
            )
        elif experiment.type == ChaosExperimentType.STALE_FEATURES:
            self.target.inject_stale_features(
                staleness_hours=parameters.get('staleness_hours', 24)
            )

        return {
            'status': 'started',
            'experiment': experiment.name,
            'duration_minutes': experiment.duration_minutes
        }

    def stop_experiment(self, experiment_name: str) -> dict:
        """Stop a running experiment."""
        experiment = self.active_experiments.pop(experiment_name, None)
        if not experiment:
            return {'status': 'not_found'}

        # Roll back
        self._rollback(experiment)

        return {'status': 'stopped', 'experiment': experiment_name}

    def evaluate_experiment(
        self,
        experiment_name: str,
        metrics_during: dict,
        metrics_baseline: dict
    ) -> dict:
        """Evaluate experiment results."""

        experiment = self.active_experiments.get(experiment_name)
        if not experiment:
            return {'status': 'experiment_not_found'}

        # Check if expected behavior occurred
        passed_checks = []
        failed_checks = []

        # Availability check
        if metrics_during.get('error_rate', 0) < 0.01:  # Less than 1% errors
            passed_checks.append("Availability maintained")
        else:
            failed_checks.append(
                f"Availability degraded: {metrics_during.get('error_rate'):.2%} errors"
            )

        # Fallback activation check
        if metrics_during.get('fallback_rate', 0) > 0:
            passed_checks.append("Fallback correctly activated")
        else:
            failed_checks.append("Fallback was not activated")

        # Latency check
        if (metrics_during.get('p99_latency_ms', 0) <
            metrics_baseline.get('p99_latency_ms', 0) * 3):
            passed_checks.append("Latency remained reasonable")
        else:
            failed_checks.append("Latency exceeded 3x baseline")

        result = {
            'experiment': experiment_name,
            'passed': len(failed_checks) == 0,
            'passed_checks': passed_checks,
            'failed_checks': failed_checks,
            'recommendation': self._get_recommendation(failed_checks)
        }

        self.results.append(result)
        return result

    def _is_safe_to_run(self, experiment: ChaosExperiment) -> bool:
        """Check if it's safe to run experiment."""
        # Don't run during peak hours
        hour = datetime.now().hour
        if 9 <= hour <= 17 and experiment.blast_radius != 'limited':
            return False

        # Don't run if another experiment is active
        if self.active_experiments:
            return False

        return True

    def _rollback(self, experiment: ChaosExperiment):
        """Roll back an experiment."""
        if experiment.type == ChaosExperimentType.FEATURE_STORE_LATENCY:
            self.target.remove_feature_latency()
        elif experiment.type == ChaosExperimentType.MODEL_FAILURE:
            self.target.remove_model_errors()
        elif experiment.type == ChaosExperimentType.STALE_FEATURES:
            self.target.remove_stale_features()

    def _get_recommendation(self, failed_checks: list[str]) -> str:
        """Get recommendation based on failures."""
        if not failed_checks:
            return "System handled chaos well. Continue regular testing."

        if "Fallback was not activated" in failed_checks:
            return "Review fallback trigger conditions. Consider lowering thresholds."

        if any("Latency" in check for check in failed_checks):
            return "Review timeout configurations. Consider adding caching layers."

        return "Review failed checks and address gaps in resilience."

Game Day

Complete implementation for game day exercises:

class GameDay:
    """Organize game day exercises for AI reliability."""

    def __init__(self):
        self.scenarios: list[dict] = []
        self.participants: list[str] = []
        self.results: list[dict] = []

    def add_scenario(
        self,
        name: str,
        chaos_experiments: list[ChaosExperiment],
        expected_response: list[str],
        success_criteria: list[str]
    ):
        """Add a game day scenario."""
        self.scenarios.append({
            'name': name,
            'experiments': chaos_experiments,
            'expected_response': expected_response,
            'success_criteria': success_criteria
        })

    def run_tabletop_exercise(self, scenario_name: str) -> dict:
        """Run tabletop exercise (discussion-based, no real chaos)."""
        scenario = next(
            (s for s in self.scenarios if s['name'] == scenario_name),
            None
        )

        if not scenario:
            return {'error': 'Scenario not found'}

        discussion_points = [
            f"Scenario: {scenario['name']}",
            "Discussion Questions:",
            "1. How would we detect this issue?",
            "2. Who would be paged?",
            "3. What would be the immediate response?",
            "4. What information would we need to diagnose?",
            "5. What are the fallback options?",
            "6. How would we communicate to stakeholders?",
            "Expected Response:",
        ] + scenario['expected_response']

        return {
            'type': 'tabletop',
            'scenario': scenario_name,
            'discussion_points': discussion_points
        }