Reliability Engineering Code Reference
Complete code implementations for Chapter 65: Reliability Engineering.
Table of Contents
- Service Level Objectives
- Graceful Degradation
- Incident Management
- Reliable Pipelines
- Observability
- Chaos Engineering
Service Level Objectives
AI SLO Framework
Complete implementation for defining AI-specific SLOs:
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from datetime import datetime, timedelta
class SLOCategory(Enum):
AVAILABILITY = "availability"
LATENCY = "latency"
QUALITY = "quality"
FRESHNESS = "freshness"
SAFETY = "safety"
@dataclass
class SLO:
"""Service Level Objective definition."""
name: str
category: SLOCategory
description: str
target: float # Target value
threshold: str # e.g., "99.9%", "< 200ms"
measurement_window: timedelta
error_budget: float # Allowed failures
@dataclass
class SLI:
"""Service Level Indicator (the metric)."""
name: str
calculation: str
data_source: str
class AISLOFramework:
"""Framework for defining AI system SLOs."""
def __init__(self, service_name: str):
self.service = service_name
self.slos: list[SLO] = []
self.slis: dict[str, SLI] = {}
def define_availability_slo(
self,
target: float = 99.9,
include_quality_check: bool = True
) -> SLO:
"""Define availability SLO for AI service."""
if include_quality_check:
description = (
"Percentage of requests that return a valid prediction "
"with confidence above minimum threshold"
)
calculation = (
"(successful_predictions - low_confidence_predictions) / "
"total_requests * 100"
)
else:
description = "Percentage of requests that return without error"
calculation = "successful_requests / total_requests * 100"
slo = SLO(
name=f"{self.service}_availability",
category=SLOCategory.AVAILABILITY,
description=description,
target=target,
threshold=f">= {target}%",
measurement_window=timedelta(days=30),
error_budget=100 - target
)
self.slos.append(slo)
self.slis[slo.name] = SLI(
name=f"{slo.name}_sli",
calculation=calculation,
data_source="request_metrics"
)
return slo
def define_latency_slo(
self,
p50_ms: int,
p99_ms: int,
include_feature_retrieval: bool = True
) -> list[SLO]:
"""Define latency SLOs."""
slos = []
component = "end_to_end" if include_feature_retrieval else "inference_only"
for percentile, target in [("p50", p50_ms), ("p99", p99_ms)]:
slo = SLO(
name=f"{self.service}_latency_{percentile}",
category=SLOCategory.LATENCY,
description=f"{percentile} latency for {component} request",
target=target,
threshold=f"< {target}ms",
measurement_window=timedelta(days=7),
error_budget=1.0 if percentile == "p99" else 5.0 # % requests can exceed
)
slos.append(slo)
self.slos.append(slo)
return slos
def define_quality_slo(
self,
metric_name: str,
baseline: float,
allowed_degradation: float = 0.05
) -> SLO:
"""Define model quality SLO."""
target = baseline * (1 - allowed_degradation)
slo = SLO(
name=f"{self.service}_quality_{metric_name}",
category=SLOCategory.QUALITY,
description=f"Model {metric_name} must remain within {allowed_degradation*100}% of baseline",
target=target,
threshold=f">= {target:.3f}",
measurement_window=timedelta(days=1),
error_budget=allowed_degradation * 100
)
self.slos.append(slo)
return slo
def define_freshness_slo(
self,
model_max_age_days: int = 7,
feature_max_age_hours: int = 1
) -> list[SLO]:
"""Define freshness SLOs."""
slos = []
model_slo = SLO(
name=f"{self.service}_model_freshness",
category=SLOCategory.FRESHNESS,
description="Model age must not exceed threshold",
target=model_max_age_days,
threshold=f"< {model_max_age_days} days",
measurement_window=timedelta(days=1),
error_budget=0 # No tolerance for stale models
)
feature_slo = SLO(
name=f"{self.service}_feature_freshness",
category=SLOCategory.FRESHNESS,
description="Features must not exceed staleness threshold",
target=feature_max_age_hours,
threshold=f"< {feature_max_age_hours} hours",
measurement_window=timedelta(hours=1),
error_budget=1.0 # 1% of requests can have stale features
)
slos = [model_slo, feature_slo]
self.slos.extend(slos)
return slos
def define_safety_slo(
self,
max_harmful_rate: float = 0.0001
) -> SLO:
"""Define safety SLO for content-generating models."""
slo = SLO(
name=f"{self.service}_safety",
category=SLOCategory.SAFETY,
description="Rate of outputs flagged as harmful",
target=max_harmful_rate,
threshold=f"< {max_harmful_rate*100}%",
measurement_window=timedelta(hours=1),
error_budget=0 # Zero tolerance, triggers immediate response
)
self.slos.append(slo)
return sloError Budget Tracker
Complete implementation for tracking error budget consumption:
from datetime import datetime, timedelta
class ErrorBudgetTracker:
"""Track error budget consumption."""
def __init__(self, slos: list[SLO]):
self.slos = {slo.name: slo for slo in slos}
self.consumption: dict[str, list[dict]] = {slo.name: [] for slo in slos}
def record_measurement(
self,
slo_name: str,
actual_value: float,
timestamp: datetime = None
):
"""Record a measurement against an SLO."""
timestamp = timestamp or datetime.now()
slo = self.slos[slo_name]
# Determine if this consumed error budget
if slo.category == SLOCategory.LATENCY:
consumed = actual_value > slo.target
elif slo.category in [SLOCategory.AVAILABILITY, SLOCategory.QUALITY]:
consumed = actual_value < slo.target
elif slo.category == SLOCategory.FRESHNESS:
consumed = actual_value > slo.target
else:
consumed = actual_value > slo.target
self.consumption[slo_name].append({
'timestamp': timestamp,
'value': actual_value,
'target': slo.target,
'consumed_budget': consumed
})
def get_budget_status(self, slo_name: str) -> dict:
"""Get current error budget status."""
slo = self.slos[slo_name]
measurements = self.consumption[slo_name]
# Filter to measurement window
window_start = datetime.now() - slo.measurement_window
recent = [m for m in measurements if m['timestamp'] >= window_start]
if not recent:
return {'status': 'no_data', 'budget_remaining': slo.error_budget}
# Calculate consumption
failures = sum(1 for m in recent if m['consumed_budget'])
total = len(recent)
failure_rate = (failures / total) * 100 if total > 0 else 0
budget_remaining = slo.error_budget - failure_rate
return {
'slo': slo_name,
'error_budget': slo.error_budget,
'consumed': failure_rate,
'remaining': max(0, budget_remaining),
'status': self._budget_status(budget_remaining, slo.error_budget),
'measurements_in_window': total,
'failures_in_window': failures
}
def _budget_status(self, remaining: float, total: float) -> str:
"""Determine budget status."""
if remaining <= 0:
return 'exhausted'
elif remaining < total * 0.25:
return 'critical'
elif remaining < total * 0.5:
return 'warning'
else:
return 'healthy'Quality Metrics and Real-Time Estimator
Complete implementation for quality metric definitions and real-time estimation:
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class QualityMetricDefinition:
"""Definition of a quality metric for SLO."""
name: str
calculation: Callable
requires_labels: bool
real_time_computable: bool
proxy_metric: Optional[str] # If labels unavailable in real-time
QUALITY_METRICS = {
'classification': [
QualityMetricDefinition(
name='accuracy',
calculation=lambda pred, true: (pred == true).mean(),
requires_labels=True,
real_time_computable=False,
proxy_metric='confidence'
),
QualityMetricDefinition(
name='precision',
calculation=lambda pred, true: precision_score(true, pred),
requires_labels=True,
real_time_computable=False,
proxy_metric='positive_rate'
),
],
'regression': [
QualityMetricDefinition(
name='mae',
calculation=lambda pred, true: abs(pred - true).mean(),
requires_labels=True,
real_time_computable=False,
proxy_metric='prediction_variance'
),
],
'ranking': [
QualityMetricDefinition(
name='ndcg',
calculation=lambda pred, true: ndcg_score(true, pred),
requires_labels=True,
real_time_computable=False,
proxy_metric='click_through_rate' # Can be computed in real-time
),
],
'generation': [
QualityMetricDefinition(
name='human_approval_rate',
calculation=lambda outputs: (outputs['rating'] >= 4).mean(),
requires_labels=True, # Requires human feedback
real_time_computable=False,
proxy_metric='perplexity'
),
],
}
class RealTimeQualityEstimator:
"""Estimate quality in real-time using proxy metrics."""
def __init__(self, proxy_baseline: dict):
self.baseline = proxy_baseline
self.current_values: dict[str, list] = {}
def record_proxy(self, metric_name: str, value: float):
"""Record proxy metric value."""
if metric_name not in self.current_values:
self.current_values[metric_name] = []
self.current_values[metric_name].append(value)
# Keep rolling window
max_size = 10000
if len(self.current_values[metric_name]) > max_size:
self.current_values[metric_name] = \
self.current_values[metric_name][-max_size:]
def estimate_quality_status(self, metric_name: str) -> dict:
"""Estimate whether quality SLO is likely being met."""
if metric_name not in self.current_values:
return {'status': 'no_data'}
current = self.current_values[metric_name]
if len(current) < 100:
return {'status': 'insufficient_data'}
baseline = self.baseline.get(metric_name, {})
baseline_mean = baseline.get('mean', 0)
baseline_std = baseline.get('std', 1)
current_mean = sum(current) / len(current)
# Z-score of drift
drift = abs(current_mean - baseline_mean) / (baseline_std + 1e-6)
if drift > 3:
status = 'likely_violated'
elif drift > 2:
status = 'potential_issue'
else:
status = 'likely_healthy'
return {
'status': status,
'current_mean': current_mean,
'baseline_mean': baseline_mean,
'drift_zscore': drift,
'sample_size': len(current)
}Graceful Degradation
Degradation Controller
Complete implementation for graceful degradation control:
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Optional
import asyncio
class DegradationLevel(Enum):
FULL_SERVICE = 0
CACHED_FEATURES = 1
SIMPLER_MODEL = 2
RULE_BASED = 3
STATIC_FALLBACK = 4
@dataclass
class DegradationStrategy:
"""A degradation strategy definition."""
level: DegradationLevel
name: str
description: str
handler: Callable
quality_impact: str
latency_budget_ms: int
class GracefulDegradationController:
"""Control graceful degradation for AI service."""
def __init__(self):
self.strategies: dict[DegradationLevel, DegradationStrategy] = {}
self.current_level = DegradationLevel.FULL_SERVICE
self.health_checks: dict[str, Callable] = {}
self.fallback_data: dict[str, Any] = {}
def register_strategy(self, strategy: DegradationStrategy):
"""Register a degradation strategy."""
self.strategies[strategy.level] = strategy
def register_health_check(self, name: str, check: Callable[[], bool]):
"""Register a health check for a component."""
self.health_checks[name] = check
async def execute_with_degradation(
self,
request: dict,
timeout_ms: int = 500
) -> dict:
"""Execute request with automatic degradation on failure."""
# Try each level in order until one succeeds
for level in DegradationLevel:
if level not in self.strategies:
continue
strategy = self.strategies[level]
try:
# Execute with timeout
result = await asyncio.wait_for(
strategy.handler(request),
timeout=strategy.latency_budget_ms / 1000
)
return {
'result': result,
'degradation_level': level.name,
'quality_impact': strategy.quality_impact
}
except asyncio.TimeoutError:
# Timeout, try next level
continue
except Exception as e:
# Error, try next level
continue
# All strategies failed
return {
'result': None,
'error': 'All degradation strategies failed',
'degradation_level': 'FAILED'
}
async def determine_service_level(self) -> DegradationLevel:
"""Determine appropriate service level based on health checks."""
# Check component health
health_status = {}
for name, check in self.health_checks.items():
try:
health_status[name] = await asyncio.wait_for(
asyncio.to_thread(check),
timeout=1.0
)
except:
health_status[name] = False
# Determine level based on health
if all(health_status.values()):
return DegradationLevel.FULL_SERVICE
if not health_status.get('feature_store', True):
return DegradationLevel.CACHED_FEATURES
if not health_status.get('primary_model', True):
return DegradationLevel.SIMPLER_MODEL
if not health_status.get('fallback_model', True):
return DegradationLevel.RULE_BASED
return DegradationLevel.STATIC_FALLBACKCircuit Breaker
Complete implementation for circuit breaker pattern:
from datetime import datetime
from typing import Any, Callable
class CircuitBreaker:
"""Circuit breaker for AI model calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.failures = 0
self.state = 'closed' # closed, open, half_open
self.last_failure_time = None
self.half_open_successes = 0
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Call function through circuit breaker."""
if self.state == 'open':
# Check if recovery timeout has passed
if self._should_attempt_reset():
self.state = 'half_open'
self.half_open_successes = 0
else:
raise CircuitOpenError("Circuit is open")
try:
result = await func(*args, **kwargs)
self._record_success()
return result
except Exception as e:
self._record_failure()
raise
def _record_success(self):
"""Record successful call."""
if self.state == 'half_open':
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = 'closed'
self.failures = 0
else:
self.failures = max(0, self.failures - 1)
def _record_failure(self):
"""Record failed call."""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = 'open'
def _should_attempt_reset(self) -> bool:
"""Check if we should try to reset the circuit."""
if self.last_failure_time is None:
return True
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_timeout
class CircuitOpenError(Exception):
"""Raised when circuit breaker is open."""
passFallback Executor
Complete implementation for fallback strategy execution:
from dataclasses import dataclass
from typing import Any, Callable
@dataclass
class FallbackStrategy:
"""Fallback strategy for AI system."""
system_type: str
primary: str
fallbacks: list[dict]
example: str
FALLBACK_STRATEGIES = [
FallbackStrategy(
system_type="Recommendation",
primary="Personalized ML model",
fallbacks=[
{"level": 1, "strategy": "Collaborative filtering with cached embeddings"},
{"level": 2, "strategy": "Popularity-based recommendations"},
{"level": 3, "strategy": "Category-based defaults"},
{"level": 4, "strategy": "Static editorial picks"},
],
example="Netflix showing 'Top 10 in Your Country' when personalization fails"
),
FallbackStrategy(
system_type="Search ranking",
primary="ML ranking model with full features",
fallbacks=[
{"level": 1, "strategy": "Simpler model with fewer features"},
{"level": 2, "strategy": "BM25 text matching"},
{"level": 3, "strategy": "Recency-based ordering"},
{"level": 4, "strategy": "Return unranked results"},
],
example="Google showing results with reduced personalization during outages"
),
FallbackStrategy(
system_type="Content moderation",
primary="Large classifier with high accuracy",
fallbacks=[
{"level": 1, "strategy": "Smaller, faster classifier"},
{"level": 2, "strategy": "Keyword-based filtering"},
{"level": 3, "strategy": "Hold for human review"},
{"level": 4, "strategy": "Block all uncertain content"},
],
example="Platform blocking content during classifier outage rather than allowing harmful content"
),
FallbackStrategy(
system_type="LLM chat",
primary="Large LLM with full context",
fallbacks=[
{"level": 1, "strategy": "Same model with reduced context"},
{"level": 2, "strategy": "Smaller/distilled model"},
{"level": 3, "strategy": "Template-based responses"},
{"level": 4, "strategy": "'I'm having trouble right now. Please try again.'"},
],
example="ChatGPT showing capacity message during high load"
),
FallbackStrategy(
system_type="Fraud detection",
primary="Real-time ML scoring",
fallbacks=[
{"level": 1, "strategy": "Batch-computed risk scores"},
{"level": 2, "strategy": "Rule-based checks"},
{"level": 3, "strategy": "Velocity limits only"},
{"level": 4, "strategy": "Manual review queue"},
],
example="Payment processor adding friction during model degradation"
),
]
class FallbackExecutor:
"""Execute appropriate fallback based on system type."""
def __init__(self, system_type: str):
self.system_type = system_type
self.fallback_handlers: dict[int, Callable] = {}
self.metrics: dict[str, int] = {}
def register_fallback(self, level: int, handler: Callable):
"""Register fallback handler for level."""
self.fallback_handlers[level] = handler
async def execute(
self,
request: Any,
starting_level: int = 0
) -> dict:
"""Execute starting from given level."""
for level in range(starting_level, 5):
if level not in self.fallback_handlers:
continue
try:
handler = self.fallback_handlers[level]
result = await handler(request)
# Record metric
metric_key = f"fallback_level_{level}"
self.metrics[metric_key] = self.metrics.get(metric_key, 0) + 1
return {
'result': result,
'fallback_level': level,
'degraded': level > 0
}
except Exception as e:
continue
# All fallbacks failed
self.metrics['all_failed'] = self.metrics.get('all_failed', 0) + 1
return {
'result': None,
'error': 'All fallback levels exhausted',
'fallback_level': -1
}Incident Management
AI Incident Signatures
Complete incident signature definitions for AI-specific incidents:
from dataclasses import dataclass
from enum import Enum
class AIIncidentType(Enum):
MODEL_DEGRADATION = "model_degradation"
DATA_QUALITY = "data_quality"
FEATURE_PIPELINE = "feature_pipeline"
SERVING_LATENCY = "serving_latency"
SAFETY_VIOLATION = "safety_violation"
DRIFT_DETECTED = "drift_detected"
TRAINING_FAILURE = "training_failure"
FEEDBACK_LOOP = "feedback_loop"
@dataclass
class AIIncidentSignature:
"""Signature of an AI incident type."""
type: AIIncidentType
signals: list[str]
typical_causes: list[str]
immediate_actions: list[str]
investigation_steps: list[str]
AI_INCIDENT_SIGNATURES = [
AIIncidentSignature(
type=AIIncidentType.MODEL_DEGRADATION,
signals=[
"Drop in online metrics (CTR, conversion, etc.)",
"Increase in user complaints",
"A/B test shows model underperforming control",
"Quality proxy metrics trending down"
],
typical_causes=[
"Data distribution shift",
"Feature computation change",
"Upstream data quality issue",
"Model deployed from wrong experiment"
],
immediate_actions=[
"Roll back to previous model version",
"Increase monitoring granularity",
"Enable additional logging"
],
investigation_steps=[
"Compare feature distributions: now vs training",
"Check recent data pipeline changes",
"Analyze model behavior by segment",
"Review recent deployments"
]
),
AIIncidentSignature(
type=AIIncidentType.SAFETY_VIOLATION,
signals=[
"Outputs flagged by safety classifier",
"User reports of harmful content",
"Monitoring alerts on sensitive outputs"
],
typical_causes=[
"Adversarial inputs exploiting model",
"Training data contamination",
"Edge case not covered by safety filters",
"Prompt injection attack"
],
immediate_actions=[
"Block affected input patterns",
"Increase safety filter strictness",
"Enable human review for flagged outputs",
"Consider service degradation to safer fallback"
],
investigation_steps=[
"Collect and analyze violating examples",
"Identify common patterns in triggers",
"Check for training data issues",
"Review safety filter coverage"
]
),
AIIncidentSignature(
type=AIIncidentType.FEATURE_PIPELINE,
signals=[
"Feature freshness SLO violation",
"Missing features at serving time",
"Sudden change in feature distributions"
],
typical_causes=[
"Pipeline job failure",
"Schema change in upstream data",
"Resource exhaustion",
"Dependency service outage"
],
immediate_actions=[
"Switch to cached/stale features",
"Enable batch features as fallback",
"Alert data engineering team"
],
investigation_steps=[
"Check pipeline job logs",
"Verify upstream data availability",
"Review recent schema changes",
"Check resource utilization"
]
),
AIIncidentSignature(
type=AIIncidentType.DRIFT_DETECTED,
signals=[
"PSI/JSD alerts on input features",
"Prediction distribution shift",
"Sudden change in model confidence"
],
typical_causes=[
"Seasonal change in user behavior",
"Product/UI change affecting features",
"Real world event (holiday, news)",
"Data collection bug"
],
immediate_actions=[
"Verify drift is real (not monitoring bug)",
"Assess impact on model performance",
"Consider triggering model retraining"
],
investigation_steps=[
"Identify which features are drifting",
"Correlate with external events",
"Check for upstream data issues",
"Analyze impact on model outputs"
]
),
]AI Incident Manager
Complete implementation for managing AI system incidents:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class IncidentSeverity(Enum):
SEV1 = 1 # Critical - major user impact
SEV2 = 2 # High - significant impact
SEV3 = 3 # Medium - moderate impact
SEV4 = 4 # Low - minor impact
class IncidentStatus(Enum):
DETECTED = "detected"
INVESTIGATING = "investigating"
MITIGATING = "mitigating"
RESOLVED = "resolved"
POST_MORTEM = "post_mortem"
@dataclass
class AIIncident:
"""An AI system incident."""
id: str
type: AIIncidentType
severity: IncidentSeverity
status: IncidentStatus
title: str
description: str
detected_at: datetime
detected_by: str # Alert name or person
affected_systems: list[str]
affected_metrics: dict[str, float] # metric -> impact
timeline: list[dict] = field(default_factory=list)
root_cause: Optional[str] = None
resolution: Optional[str] = None
resolved_at: Optional[datetime] = None
class AIIncidentManager:
"""Manage AI system incidents."""
def __init__(self):
self.active_incidents: dict[str, AIIncident] = {}
self.incident_history: list[AIIncident] = []
self.on_call_rotation: dict[str, str] = {}
def create_incident(
self,
type: AIIncidentType,
severity: IncidentSeverity,
title: str,
description: str,
detected_by: str,
affected_systems: list[str]
) -> AIIncident:
"""Create new incident."""
incident_id = f"AI-{datetime.now().strftime('%Y%m%d')}-{len(self.incident_history)+1:04d}"
incident = AIIncident(
id=incident_id,
type=type,
severity=severity,
status=IncidentStatus.DETECTED,
title=title,
description=description,
detected_at=datetime.now(),
detected_by=detected_by,
affected_systems=affected_systems,
affected_metrics={},
timeline=[{
'timestamp': datetime.now(),
'status': 'detected',
'description': f'Incident detected by {detected_by}'
}]
)
self.active_incidents[incident_id] = incident
# Page on-call if high severity
if severity in [IncidentSeverity.SEV1, IncidentSeverity.SEV2]:
self._page_on_call(incident)
return incident
def update_status(
self,
incident_id: str,
new_status: IncidentStatus,
description: str
):
"""Update incident status."""
incident = self.active_incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident {incident_id} not found")
incident.status = new_status
incident.timeline.append({
'timestamp': datetime.now(),
'status': new_status.value,
'description': description
})
if new_status == IncidentStatus.RESOLVED:
incident.resolved_at = datetime.now()
self._move_to_history(incident_id)
def get_runbook(self, incident_type: AIIncidentType) -> dict:
"""Get incident response runbook."""
signature = next(
(s for s in AI_INCIDENT_SIGNATURES if s.type == incident_type),
None
)
if not signature:
return {'error': 'No runbook for incident type'}
return {
'type': incident_type.value,
'signals': signature.signals,
'immediate_actions': signature.immediate_actions,
'investigation_steps': signature.investigation_steps,
'typical_causes': signature.typical_causes,
'escalation_path': self._get_escalation_path(incident_type)
}
def _page_on_call(self, incident: AIIncident):
"""Page on-call engineer."""
# Would integrate with PagerDuty, OpsGenie, etc.
pass
def _move_to_history(self, incident_id: str):
"""Move resolved incident to history."""
incident = self.active_incidents.pop(incident_id, None)
if incident:
self.incident_history.append(incident)
def _get_escalation_path(self, incident_type: AIIncidentType) -> list[str]:
"""Get escalation path for incident type."""
paths = {
AIIncidentType.MODEL_DEGRADATION: [
"ML Engineer On-Call",
"ML Tech Lead",
"ML Engineering Manager"
],
AIIncidentType.SAFETY_VIOLATION: [
"ML Engineer On-Call",
"Trust & Safety Team",
"VP of Engineering"
],
AIIncidentType.FEATURE_PIPELINE: [
"Data Engineer On-Call",
"ML Engineer On-Call",
"Data Engineering Manager"
],
}
return paths.get(incident_type, ["Engineering On-Call"])Post-Incident Review
Complete implementation for post-incident review:
from dataclasses import dataclass
@dataclass
class PostIncidentReview:
"""Post-incident review (blameless post-mortem)."""
incident_id: str
incident_summary: str
timeline: list[dict]
root_cause_analysis: str
contributing_factors: list[str]
what_went_well: list[str]
what_could_improve: list[str]
action_items: list[dict]
prevention_measures: list[str]
detection_improvements: list[str]
class PostIncidentReviewTemplate:
"""Generate post-incident review template."""
@staticmethod
def generate(incident: AIIncident) -> PostIncidentReview:
"""Generate review template from incident."""
# Standard questions for AI incidents
questions = {
AIIncidentType.MODEL_DEGRADATION: {
'root_cause': [
"What changed in the data distribution?",
"Were there upstream data changes?",
"Was the correct model version deployed?",
"Were features computed correctly?"
],
'prevention': [
"Add canary deployment for models",
"Improve monitoring coverage",
"Add data validation gates",
"Implement automatic rollback"
]
},
AIIncidentType.SAFETY_VIOLATION: {
'root_cause': [
"How did harmful content bypass filters?",
"Was this a known edge case?",
"Was training data contaminated?",
"Were safety classifiers up to date?"
],
'prevention': [
"Expand safety test coverage",
"Add adversarial testing to CI",
"Improve real-time safety filtering",
"Establish red team process"
]
},
}
incident_questions = questions.get(incident.type, {})
return PostIncidentReview(
incident_id=incident.id,
incident_summary=f"{incident.title}: {incident.description}",
timeline=incident.timeline,
root_cause_analysis=incident.root_cause or "To be determined",
contributing_factors=[], # To be filled
what_went_well=[], # To be filled
what_could_improve=[], # To be filled
action_items=[], # To be filled
prevention_measures=incident_questions.get('prevention', []),
detection_improvements=[] # To be filled
)Incident Metrics
Complete implementation for tracking incident metrics:
from datetime import datetime, timedelta
class IncidentMetrics:
"""Track incident metrics for SRE reporting."""
def __init__(self):
self.incidents: list[AIIncident] = []
def add_incident(self, incident: AIIncident):
"""Add incident to metrics."""
self.incidents.append(incident)
def calculate_metrics(self, window_days: int = 30) -> dict:
"""Calculate incident metrics."""
cutoff = datetime.now() - timedelta(days=window_days)
recent = [i for i in self.incidents if i.detected_at >= cutoff]
if not recent:
return {'total_incidents': 0}
# MTTR (Mean Time to Recovery)
resolution_times = []
for i in recent:
if i.resolved_at:
delta = (i.resolved_at - i.detected_at).total_seconds() / 3600
resolution_times.append(delta)
mttr_hours = (
sum(resolution_times) / len(resolution_times)
if resolution_times else None
)
# Incidents by type
by_type = {}
for i in recent:
by_type[i.type.value] = by_type.get(i.type.value, 0) + 1
# Incidents by severity
by_severity = {}
for i in recent:
by_severity[i.severity.name] = by_severity.get(i.severity.name, 0) + 1
return {
'total_incidents': len(recent),
'mttr_hours': mttr_hours,
'by_type': by_type,
'by_severity': by_severity,
'window_days': window_days
}Reliable Pipelines
Reliable Training Pipeline
Complete implementation for training pipeline with reliability features:
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class TrainingPipelineConfig:
"""Configuration for reliable training pipeline."""
checkpoint_frequency: int # Steps between checkpoints
max_retries: int
timeout_hours: int
resource_limits: dict
validation_gates: list[Callable]
rollback_on_failure: bool
class ReliableTrainingPipeline:
"""Training pipeline with reliability features."""
def __init__(self, config: TrainingPipelineConfig):
self.config = config
self.checkpoints: list[str] = []
self.current_step = 0
async def train(
self,
model,
train_data,
val_data
) -> dict:
"""Run training with reliability guarantees."""
attempt = 0
last_checkpoint = None
while attempt < self.config.max_retries:
try:
# Resume from checkpoint if available
if last_checkpoint:
model.load_checkpoint(last_checkpoint)
# Training loop with checkpointing
for step, batch in enumerate(train_data):
self.current_step = step
# Train step
loss = model.train_step(batch)
# Checkpoint periodically
if step % self.config.checkpoint_frequency == 0:
checkpoint_path = self._save_checkpoint(model, step)
last_checkpoint = checkpoint_path
self.checkpoints.append(checkpoint_path)
# Validation gate
if step % (self.config.checkpoint_frequency * 10) == 0:
if not self._validate(model, val_data):
raise TrainingValidationError(
f"Validation failed at step {step}"
)
# Final validation
final_metrics = self._final_validation(model, val_data)
return {
'status': 'success',
'final_metrics': final_metrics,
'checkpoints': self.checkpoints,
'attempts': attempt + 1
}
except Exception as e:
attempt += 1
if attempt >= self.config.max_retries:
if self.config.rollback_on_failure and last_checkpoint:
return {
'status': 'failed_with_rollback',
'last_checkpoint': last_checkpoint,
'error': str(e)
}
raise
def _save_checkpoint(self, model, step: int) -> str:
"""Save training checkpoint."""
path = f"checkpoints/model_step_{step}.pt"
model.save(path)
return path
def _validate(self, model, val_data) -> bool:
"""Run validation gates."""
for gate in self.config.validation_gates:
if not gate(model, val_data):
return False
return True
def _final_validation(self, model, val_data) -> dict:
"""Run final validation."""
return model.evaluate(val_data)
class TrainingValidationError(Exception):
"""Raised when training validation fails."""
passLoad Shedding Strategy
Complete implementation for load shedding:
from collections import defaultdict
import time
class LoadSheddingStrategy:
"""Load shedding strategies for AI systems."""
def __init__(
self,
max_qps: int,
priority_levels: int = 4
):
self.max_qps = max_qps
self.priority_levels = priority_levels
self.current_qps = 0
self.request_counts: dict[int, int] = defaultdict(int)
self.last_reset = time.time()
def should_accept(
self,
request: dict,
priority: int = 2
) -> dict:
"""Decide whether to accept or shed request."""
# Reset counter if new second
now = time.time()
if now - self.last_reset >= 1.0:
self.current_qps = 0
self.request_counts = defaultdict(int)
self.last_reset = now
# Calculate load factor
load_factor = self.current_qps / self.max_qps
# Priority-based shedding
# Priority 0 = highest (critical), Priority 3 = lowest (nice-to-have)
if load_factor >= 0.95:
# Extreme load: only accept priority 0
accept = priority == 0
reason = "extreme_load"
elif load_factor >= 0.85:
# High load: accept priority 0-1
accept = priority <= 1
reason = "high_load"
elif load_factor >= 0.70:
# Moderate load: accept priority 0-2
accept = priority <= 2
reason = "moderate_load"
else:
# Normal load: accept all
accept = True
reason = "normal_load"
if accept:
self.current_qps += 1
self.request_counts[priority] += 1
return {
'accept': accept,
'reason': reason,
'load_factor': load_factor,
'priority': priority,
'current_qps': self.current_qps
}
def get_request_priority(self, request: dict) -> int:
"""Determine request priority."""
# Critical: Safety-related, high-value transactions
if request.get('is_safety_critical'):
return 0
# High: Logged-in users, premium customers
if request.get('user_tier') == 'premium':
return 1
# Normal: Standard requests
if request.get('user_id'):
return 2
# Low: Anonymous, background tasks
return 3Adaptive Rate Limiter
Complete implementation for adaptive rate limiting:
class AdaptiveRateLimiter:
"""Rate limiter that adapts to model latency."""
def __init__(
self,
base_limit: int,
latency_threshold_ms: int = 100
):
self.base_limit = base_limit
self.latency_threshold = latency_threshold_ms
self.current_limit = base_limit
self.latency_samples: list[float] = []
def record_latency(self, latency_ms: float):
"""Record request latency."""
self.latency_samples.append(latency_ms)
# Keep rolling window
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
# Adapt limit based on latency
self._adapt_limit()
def _adapt_limit(self):
"""Adapt rate limit based on observed latency."""
if len(self.latency_samples) < 100:
return
# Calculate p95 latency
sorted_samples = sorted(self.latency_samples)
p95_idx = int(len(sorted_samples) * 0.95)
p95_latency = sorted_samples[p95_idx]
# Adjust limit
if p95_latency > self.latency_threshold * 2:
# Severely overloaded: reduce limit significantly
self.current_limit = max(
self.base_limit * 0.5,
self.current_limit * 0.8
)
elif p95_latency > self.latency_threshold:
# Overloaded: reduce limit slightly
self.current_limit = max(
self.base_limit * 0.7,
self.current_limit * 0.95
)
elif p95_latency < self.latency_threshold * 0.5:
# Underloaded: increase limit
self.current_limit = min(
self.base_limit * 1.5,
self.current_limit * 1.05
)
def get_current_limit(self) -> int:
"""Get current rate limit."""
return int(self.current_limit)Reliable Serving Pipeline
Complete implementation for serving pipeline with reliability features:
class ReliableServingPipeline:
"""Serving pipeline with reliability features."""
def __init__(
self,
primary_model,
fallback_model,
feature_store,
circuit_breaker: CircuitBreaker,
degradation_controller: GracefulDegradationController
):
self.primary = primary_model
self.fallback = fallback_model
self.features = feature_store
self.circuit_breaker = circuit_breaker
self.degradation = degradation_controller
# Metrics
self.request_count = 0
self.fallback_count = 0
self.error_count = 0
async def predict(self, request: dict) -> dict:
"""Make prediction with reliability guarantees."""
self.request_count += 1
try:
# Get features
features = await self._get_features_with_fallback(request)
# Predict with circuit breaker
try:
prediction = await self.circuit_breaker.call(
self.primary.predict,
features
)
return {
'prediction': prediction,
'model': 'primary',
'degraded': False
}
except CircuitOpenError:
# Primary circuit open, use fallback
self.fallback_count += 1
prediction = await self.fallback.predict(features)
return {
'prediction': prediction,
'model': 'fallback',
'degraded': True
}
except Exception as e:
self.error_count += 1
# Use degradation controller
return await self.degradation.execute_with_degradation(request)
async def _get_features_with_fallback(self, request: dict) -> dict:
"""Get features with fallback to cached values."""
try:
return await self.features.get_online_features(
feature_refs=request.get('feature_refs', []),
entity_keys=request.get('entity_keys', {})
)
except Exception:
# Fall back to cached/batch features
return await self.features.get_cached_features(
entity_keys=request.get('entity_keys', {})
)
def get_reliability_metrics(self) -> dict:
"""Get reliability metrics."""
return {
'total_requests': self.request_count,
'fallback_requests': self.fallback_count,
'error_requests': self.error_count,
'fallback_rate': self.fallback_count / max(1, self.request_count),
'error_rate': self.error_count / max(1, self.request_count),
'circuit_state': self.circuit_breaker.state
}Observability
AI Observability Pipeline
Complete implementation for AI system observability:
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
import json
@dataclass
class AIObservabilityEvent:
"""An observability event for AI systems."""
timestamp: datetime
event_type: str # request, prediction, feature, model, error
service: str
model_version: str
request_id: str
latency_ms: float
features: Optional[dict]
prediction: Optional[Any]
confidence: Optional[float]
metadata: dict
class AIObservabilityPipeline:
"""Observability pipeline for AI systems."""
def __init__(self, metrics_backend, logging_backend, tracing_backend):
self.metrics = metrics_backend
self.logs = logging_backend
self.traces = tracing_backend
self.prediction_samples: list[dict] = []
def record_prediction(
self,
request_id: str,
model_version: str,
features: dict,
prediction: Any,
confidence: float,
latency_ms: float,
metadata: dict = None
):
"""Record a prediction for observability."""
# Metrics
self.metrics.increment('predictions_total', tags={
'model_version': model_version
})
self.metrics.histogram('prediction_latency_ms', latency_ms)
self.metrics.histogram('prediction_confidence', confidence)
# Logging (sample to avoid overwhelming)
if self._should_sample_log():
self.logs.info(
"prediction",
request_id=request_id,
model_version=model_version,
prediction=str(prediction)[:100], # Truncate
confidence=confidence,
latency_ms=latency_ms
)
# Trace span
self.traces.add_span_data({
'prediction': str(prediction)[:100],
'confidence': confidence,
'model_version': model_version
})
# Sample predictions for analysis
self._sample_prediction({
'request_id': request_id,
'features': features,
'prediction': prediction,
'confidence': confidence,
'timestamp': datetime.now()
})
def record_feature_retrieval(
self,
request_id: str,
feature_names: list[str],
latency_ms: float,
cache_hit: bool,
stale_features: list[str]
):
"""Record feature retrieval metrics."""
self.metrics.histogram('feature_retrieval_latency_ms', latency_ms)
self.metrics.increment(
'feature_cache_hits' if cache_hit else 'feature_cache_misses'
)
if stale_features:
self.metrics.increment(
'stale_features',
value=len(stale_features)
)
self.logs.warning(
"stale_features",
request_id=request_id,
stale_features=stale_features
)
def record_degradation(
self,
request_id: str,
original_level: int,
actual_level: int,
reason: str
):
"""Record when service degradation occurs."""
self.metrics.increment('degradation_events', tags={
'from_level': str(original_level),
'to_level': str(actual_level),
'reason': reason
})
self.logs.warning(
"service_degradation",
request_id=request_id,
from_level=original_level,
to_level=actual_level,
reason=reason
)
def _should_sample_log(self) -> bool:
"""Determine if this request should be logged."""
import random
return random.random() < 0.01 # 1% sampling
def _sample_prediction(self, prediction_data: dict):
"""Sample predictions for offline analysis."""
self.prediction_samples.append(prediction_data)
# Keep bounded
max_samples = 10000
if len(self.prediction_samples) > max_samples:
self.prediction_samples = self.prediction_samples[-max_samples:]Distribution Monitor
Complete implementation for monitoring feature and prediction distributions:
class DistributionMonitor:
"""Monitor feature and prediction distributions."""
def __init__(self):
self.baselines: dict[str, dict] = {}
self.current: dict[str, list] = {}
def set_baseline(self, name: str, values: list[float]):
"""Set baseline distribution."""
self.baselines[name] = {
'mean': sum(values) / len(values),
'std': self._std(values),
'min': min(values),
'max': max(values),
'count': len(values)
}
def record_value(self, name: str, value: float):
"""Record a value for monitoring."""
if name not in self.current:
self.current[name] = []
self.current[name].append(value)
# Keep bounded
if len(self.current[name]) > 10000:
self.current[name] = self.current[name][-10000:]
def check_distribution(self, name: str) -> dict:
"""Check current distribution against baseline."""
if name not in self.baselines or name not in self.current:
return {'status': 'no_data'}
baseline = self.baselines[name]
current = self.current[name]
if len(current) < 100:
return {'status': 'insufficient_data'}
current_mean = sum(current) / len(current)
current_std = self._std(current)
# Drift metrics
mean_drift = (
abs(current_mean - baseline['mean']) /
(baseline['std'] + 1e-6)
)
return {
'status': 'ok' if mean_drift < 2 else 'drift_detected',
'baseline_mean': baseline['mean'],
'current_mean': current_mean,
'mean_drift_zscore': mean_drift,
'alert': mean_drift >= 3
}
def _std(self, values: list) -> float:
if not values:
return 0
mean = sum(values) / len(values)
return (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5Chaos Engineering
Chaos Experiments
Chaos experiment definitions for AI systems:
from dataclasses import dataclass
from enum import Enum
class ChaosExperimentType(Enum):
MODEL_FAILURE = "model_failure"
FEATURE_STORE_LATENCY = "feature_store_latency"
STALE_FEATURES = "stale_features"
MODEL_LATENCY = "model_latency"
DATA_QUALITY = "data_quality"
CAPACITY_OVERLOAD = "capacity_overload"
@dataclass
class ChaosExperiment:
"""A chaos experiment definition."""
name: str
type: ChaosExperimentType
description: str
blast_radius: str # limited, moderate, full
expected_behavior: str
rollback_procedure: str
duration_minutes: int
CHAOS_EXPERIMENTS = [
ChaosExperiment(
name="feature_store_latency_spike",
type=ChaosExperimentType.FEATURE_STORE_LATENCY,
description="Add 500ms latency to feature store calls",
blast_radius="limited",
expected_behavior="System should fall back to cached features or simpler model",
rollback_procedure="Remove latency injection",
duration_minutes=15
),
ChaosExperiment(
name="primary_model_failure",
type=ChaosExperimentType.MODEL_FAILURE,
description="Return errors from primary model",
blast_radius="moderate",
expected_behavior="System should fall back to secondary model",
rollback_procedure="Restore primary model",
duration_minutes=10
),
ChaosExperiment(
name="stale_feature_injection",
type=ChaosExperimentType.STALE_FEATURES,
description="Serve features with old timestamps",
blast_radius="limited",
expected_behavior="System should detect staleness and alert",
rollback_procedure="Restore normal feature timestamps",
duration_minutes=20
),
ChaosExperiment(
name="capacity_overload",
type=ChaosExperimentType.CAPACITY_OVERLOAD,
description="Send 2x normal traffic",
blast_radius="moderate",
expected_behavior="System should gracefully shed load and maintain SLOs for priority traffic",
rollback_procedure="Return to normal traffic levels",
duration_minutes=15
),
]Chaos Runner
Complete implementation for running chaos experiments:
from datetime import datetime
from typing import Optional
class ChaosRunner:
"""Run chaos experiments on AI systems."""
def __init__(self, target_service):
self.target = target_service
self.active_experiments: dict[str, ChaosExperiment] = {}
self.results: list[dict] = []
def start_experiment(
self,
experiment: ChaosExperiment,
parameters: dict = None
) -> dict:
"""Start a chaos experiment."""
# Safety checks
if not self._is_safe_to_run(experiment):
return {
'status': 'blocked',
'reason': 'Safety checks failed'
}
# Start experiment
self.active_experiments[experiment.name] = experiment
# Apply chaos
if experiment.type == ChaosExperimentType.FEATURE_STORE_LATENCY:
self.target.inject_feature_latency(
latency_ms=parameters.get('latency_ms', 500)
)
elif experiment.type == ChaosExperimentType.MODEL_FAILURE:
self.target.inject_model_errors(
error_rate=parameters.get('error_rate', 1.0)
)
elif experiment.type == ChaosExperimentType.STALE_FEATURES:
self.target.inject_stale_features(
staleness_hours=parameters.get('staleness_hours', 24)
)
return {
'status': 'started',
'experiment': experiment.name,
'duration_minutes': experiment.duration_minutes
}
def stop_experiment(self, experiment_name: str) -> dict:
"""Stop a running experiment."""
experiment = self.active_experiments.pop(experiment_name, None)
if not experiment:
return {'status': 'not_found'}
# Roll back
self._rollback(experiment)
return {'status': 'stopped', 'experiment': experiment_name}
def evaluate_experiment(
self,
experiment_name: str,
metrics_during: dict,
metrics_baseline: dict
) -> dict:
"""Evaluate experiment results."""
experiment = self.active_experiments.get(experiment_name)
if not experiment:
return {'status': 'experiment_not_found'}
# Check if expected behavior occurred
passed_checks = []
failed_checks = []
# Availability check
if metrics_during.get('error_rate', 0) < 0.01: # Less than 1% errors
passed_checks.append("Availability maintained")
else:
failed_checks.append(
f"Availability degraded: {metrics_during.get('error_rate'):.2%} errors"
)
# Fallback activation check
if metrics_during.get('fallback_rate', 0) > 0:
passed_checks.append("Fallback correctly activated")
else:
failed_checks.append("Fallback was not activated")
# Latency check
if (metrics_during.get('p99_latency_ms', 0) <
metrics_baseline.get('p99_latency_ms', 0) * 3):
passed_checks.append("Latency remained reasonable")
else:
failed_checks.append("Latency exceeded 3x baseline")
result = {
'experiment': experiment_name,
'passed': len(failed_checks) == 0,
'passed_checks': passed_checks,
'failed_checks': failed_checks,
'recommendation': self._get_recommendation(failed_checks)
}
self.results.append(result)
return result
def _is_safe_to_run(self, experiment: ChaosExperiment) -> bool:
"""Check if it's safe to run experiment."""
# Don't run during peak hours
hour = datetime.now().hour
if 9 <= hour <= 17 and experiment.blast_radius != 'limited':
return False
# Don't run if another experiment is active
if self.active_experiments:
return False
return True
def _rollback(self, experiment: ChaosExperiment):
"""Roll back an experiment."""
if experiment.type == ChaosExperimentType.FEATURE_STORE_LATENCY:
self.target.remove_feature_latency()
elif experiment.type == ChaosExperimentType.MODEL_FAILURE:
self.target.remove_model_errors()
elif experiment.type == ChaosExperimentType.STALE_FEATURES:
self.target.remove_stale_features()
def _get_recommendation(self, failed_checks: list[str]) -> str:
"""Get recommendation based on failures."""
if not failed_checks:
return "System handled chaos well. Continue regular testing."
if "Fallback was not activated" in failed_checks:
return "Review fallback trigger conditions. Consider lowering thresholds."
if any("Latency" in check for check in failed_checks):
return "Review timeout configurations. Consider adding caching layers."
return "Review failed checks and address gaps in resilience."Game Day
Complete implementation for game day exercises:
class GameDay:
"""Organize game day exercises for AI reliability."""
def __init__(self):
self.scenarios: list[dict] = []
self.participants: list[str] = []
self.results: list[dict] = []
def add_scenario(
self,
name: str,
chaos_experiments: list[ChaosExperiment],
expected_response: list[str],
success_criteria: list[str]
):
"""Add a game day scenario."""
self.scenarios.append({
'name': name,
'experiments': chaos_experiments,
'expected_response': expected_response,
'success_criteria': success_criteria
})
def run_tabletop_exercise(self, scenario_name: str) -> dict:
"""Run tabletop exercise (discussion-based, no real chaos)."""
scenario = next(
(s for s in self.scenarios if s['name'] == scenario_name),
None
)
if not scenario:
return {'error': 'Scenario not found'}
discussion_points = [
f"Scenario: {scenario['name']}",
"Discussion Questions:",
"1. How would we detect this issue?",
"2. Who would be paged?",
"3. What would be the immediate response?",
"4. What information would we need to diagnose?",
"5. What are the fallback options?",
"6. How would we communicate to stakeholders?",
"Expected Response:",
] + scenario['expected_response']
return {
'type': 'tabletop',
'scenario': scenario_name,
'discussion_points': discussion_points
}