MLOps & Evaluation - Complete Code Reference
This file contains full implementations referenced from Chapter 51. The chapter contains conceptual explanations and condensed examples; this file has production-ready code.
Table of Contents
- Evaluation Framework
- LLM-as-Judge Patterns
- A/B Testing Infrastructure
- Model Comparison
- Production Monitoring
Evaluation Framework
Base Evaluator Classes
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
from datetime import datetime
import json
import numpy as np
@dataclass
class EvaluationResult:
example_id: str
input: str
expected: Any
actual: Any
metrics: dict[str, float]
metadata: dict[str, Any]
@dataclass
class EvaluationReport:
run_id: str
timestamp: datetime
model_version: str
dataset_name: str
num_examples: int
aggregate_metrics: dict[str, float]
per_example_results: list[EvaluationResult]
metadata: dict[str, Any]
class Evaluator(ABC):
"""Base class for evaluators."""
@abstractmethod
def evaluate(self, expected: Any, actual: Any) -> dict[str, float]:
"""Evaluate a single example."""
pass
@abstractmethod
def aggregate(self, results: list[dict[str, float]]) -> dict[str, float]:
"""Aggregate results across examples."""
pass
class ClassificationEvaluator(Evaluator):
def __init__(self, labels: list[str]):
self.labels = labels
def evaluate(self, expected: str, actual: str) -> dict[str, float]:
return {
'correct': float(expected == actual),
'expected_label': expected,
'predicted_label': actual
}
def aggregate(self, results: list[dict[str, float]]) -> dict[str, float]:
correct = sum(r['correct'] for r in results)
total = len(results)
# Per-label metrics
label_correct = {label: 0 for label in self.labels}
label_total = {label: 0 for label in self.labels}
label_predicted = {label: 0 for label in self.labels}
for r in results:
expected = r['expected_label']
predicted = r['predicted_label']
label_total[expected] += 1
label_predicted[predicted] += 1
if r['correct']:
label_correct[expected] += 1
metrics = {
'accuracy': correct / total,
'num_correct': correct,
'num_total': total
}
for label in self.labels:
if label_total[label] > 0:
metrics[f'recall_{label}'] = label_correct[label] / label_total[label]
if label_predicted[label] > 0:
metrics[f'precision_{label}'] = label_correct[label] / label_predicted[label]
return metricsEvaluation Pipeline
class EvaluationPipeline:
def __init__(
self,
model,
evaluators: dict[str, Evaluator],
results_store
):
self.model = model
self.evaluators = evaluators
self.results_store = results_store
def run(
self,
dataset: list[dict],
model_version: str,
dataset_name: str
) -> EvaluationReport:
"""Run evaluation pipeline on dataset."""
run_id = f"{model_version}_{dataset_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
results = []
for example in dataset:
# Generate model output
actual = self.model.generate(example['input'])
# Run all evaluators
example_metrics = {}
for name, evaluator in self.evaluators.items():
metrics = evaluator.evaluate(example['expected'], actual)
example_metrics.update({f"{name}_{k}": v for k, v in metrics.items()})
results.append(EvaluationResult(
example_id=example['id'],
input=example['input'],
expected=example['expected'],
actual=actual,
metrics=example_metrics,
metadata=example.get('metadata', {})
))
# Aggregate metrics
aggregate = {}
for name, evaluator in self.evaluators.items():
evaluator_results = [
{k.replace(f"{name}_", ""): v for k, v in r.metrics.items() if k.startswith(f"{name}_")}
for r in results
]
agg = evaluator.aggregate(evaluator_results)
aggregate.update({f"{name}_{k}": v for k, v in agg.items()})
report = EvaluationReport(
run_id=run_id,
timestamp=datetime.now(),
model_version=model_version,
dataset_name=dataset_name,
num_examples=len(dataset),
aggregate_metrics=aggregate,
per_example_results=results,
metadata={'evaluators': list(self.evaluators.keys())}
)
# Store results
self.results_store.save(report)
return reportDataset Management
from typing import Iterator
import random
class EvaluationDataset:
"""Manages evaluation datasets with versioning and sampling."""
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.datasets = {}
def register_dataset(
self,
name: str,
examples: list[dict],
version: str,
metadata: dict = None
):
"""Register a new dataset version."""
dataset_id = f"{name}_v{version}"
self.datasets[dataset_id] = {
'name': name,
'version': version,
'examples': examples,
'metadata': metadata or {},
'created_at': datetime.now().isoformat()
}
self._save_dataset(dataset_id)
def get_dataset(self, name: str, version: str = 'latest') -> list[dict]:
"""Retrieve dataset by name and version."""
if version == 'latest':
matching = [k for k in self.datasets if k.startswith(f"{name}_v")]
if not matching:
raise ValueError(f"No dataset found with name: {name}")
dataset_id = sorted(matching)[-1]
else:
dataset_id = f"{name}_v{version}"
return self.datasets[dataset_id]['examples']
def sample_dataset(
self,
name: str,
n: int,
stratify_by: str = None,
seed: int = 42
) -> list[dict]:
"""Sample n examples from dataset with optional stratification."""
examples = self.get_dataset(name)
random.seed(seed)
if stratify_by:
groups = {}
for ex in examples:
key = ex.get(stratify_by, 'unknown')
groups.setdefault(key, []).append(ex)
sampled = []
for group_examples in groups.values():
group_n = max(1, int(n * len(group_examples) / len(examples)))
sampled.extend(random.sample(group_examples, min(group_n, len(group_examples))))
return sampled[:n]
else:
return random.sample(examples, min(n, len(examples)))LLM-as-Judge Patterns
Basic Judge
from enum import Enum
from pydantic import BaseModel
class Rating(Enum):
EXCELLENT = 5
GOOD = 4
ACCEPTABLE = 3
POOR = 2
UNACCEPTABLE = 1
class JudgeResult(BaseModel):
rating: int
reasoning: str
aspects: dict[str, int]
class LLMJudge:
def __init__(self, judge_model, criteria: list[str]):
self.model = judge_model
self.criteria = criteria
def evaluate(
self,
prompt: str,
response: str,
reference: str = None
) -> JudgeResult:
"""Evaluate a response using LLM-as-judge."""
judge_prompt = self._build_judge_prompt(prompt, response, reference)
result = self.model.generate(
judge_prompt,
response_format={"type": "json_object"}
)
return JudgeResult.model_validate_json(result)
def _build_judge_prompt(self, prompt: str, response: str, reference: str = None) -> str:
criteria_text = "\n".join(f"- {c}" for c in self.criteria)
reference_section = f"\nREFERENCE ANSWER:\n{reference}\n" if reference else ""
return f"""You are an expert evaluator. Evaluate the following response.
ORIGINAL PROMPT:
{prompt}
RESPONSE TO EVALUATE:
{response}
{reference_section}
EVALUATION CRITERIA:
{criteria_text}
Rate 1-5 scale: 1=Unacceptable, 2=Poor, 3=Acceptable, 4=Good, 5=Excellent
Return JSON: {{"rating": <1-5>, "reasoning": "<explanation>", "aspects": {{"<criterion>": <1-5>, ...}}}}
Be critical and consistent. Only give 5s for truly excellent responses."""Pairwise Comparison Judge
class PairwiseJudge:
def __init__(self, judge_model):
self.model = judge_model
def compare(
self,
prompt: str,
response_a: str,
response_b: str,
criteria: list[str]
) -> dict:
"""Compare two responses, controlling for position bias."""
# Run comparison in both orders
result_ab = self._compare_once(prompt, response_a, response_b, criteria)
result_ba = self._compare_once(prompt, response_b, response_a, criteria)
# Reconcile results
if result_ab['winner'] == 'A' and result_ba['winner'] == 'B':
return {'winner': 'A', 'confidence': 'high', 'reasoning': result_ab['reasoning']}
elif result_ab['winner'] == 'B' and result_ba['winner'] == 'A':
return {'winner': 'B', 'confidence': 'high', 'reasoning': result_ab['reasoning']}
elif result_ab['winner'] == result_ba['winner']:
return {'winner': result_ab['winner'], 'confidence': 'high', 'reasoning': result_ab['reasoning']}
else:
return {'winner': 'tie', 'confidence': 'low', 'reasoning': 'Inconsistent results'}
def _compare_once(self, prompt: str, response_a: str, response_b: str, criteria: list[str]) -> dict:
judge_prompt = f"""Compare these two responses.
PROMPT: {prompt}
RESPONSE A:
{response_a}
RESPONSE B:
{response_b}
CRITERIA: {', '.join(criteria)}
Return JSON: {{"winner": "A" or "B" or "tie", "reasoning": "<explanation>"}}"""
result = self.model.generate(judge_prompt, response_format={"type": "json_object"})
return json.loads(result)Multi-Aspect Judge
class MultiAspectJudge:
"""Evaluate multiple aspects independently."""
ASPECT_PROMPTS = {
'correctness': """Evaluate ONLY factual correctness. Rate 1-5.""",
'helpfulness': """Evaluate ONLY how helpful this is. Rate 1-5.""",
'safety': """Evaluate ONLY safety (no harmful content). Rate 1-5.""",
'coherence': """Evaluate ONLY clarity and organization. Rate 1-5.""",
'groundedness': """Evaluate ONLY whether claims are grounded in context. Rate 1-5."""
}
def __init__(self, judge_model):
self.model = judge_model
def evaluate(
self,
prompt: str,
response: str,
context: str = None,
aspects: list[str] = None
) -> dict[str, dict]:
"""Evaluate multiple aspects independently."""
aspects = aspects or list(self.ASPECT_PROMPTS.keys())
results = {}
for aspect in aspects:
if aspect not in self.ASPECT_PROMPTS:
continue
aspect_prompt = f"""{self.ASPECT_PROMPTS[aspect]}
PROMPT: {prompt}
{"CONTEXT: " + context if context else ""}
RESPONSE: {response}
Return JSON: {{"rating": <1-5>, "reasoning": "<brief explanation>"}}"""
result = self.model.generate(aspect_prompt, response_format={"type": "json_object"})
results[aspect] = json.loads(result)
ratings = [r['rating'] for r in results.values()]
results['overall'] = {'rating': sum(ratings) / len(ratings)}
return resultsJudge Calibration
class JudgeCalibrator:
"""Calibrate LLM judge against human ratings."""
def __init__(self, judge: LLMJudge):
self.judge = judge
self.calibration_data = []
def add_calibration_example(
self,
prompt: str,
response: str,
human_rating: float,
human_reasoning: str = None
):
"""Add human-rated example for calibration."""
self.calibration_data.append({
'prompt': prompt,
'response': response,
'human_rating': human_rating,
'human_reasoning': human_reasoning
})
def evaluate_calibration(self) -> dict:
"""Evaluate judge agreement with human ratings."""
human_ratings = []
judge_ratings = []
for example in self.calibration_data:
result = self.judge.evaluate(example['prompt'], example['response'])
human_ratings.append(example['human_rating'])
judge_ratings.append(result.rating)
correlation = np.corrcoef(human_ratings, judge_ratings)[0, 1]
agreements = sum(1 for h, j in zip(human_ratings, judge_ratings) if abs(h - j) <= 1)
agreement_rate = agreements / len(human_ratings)
bias = np.mean(judge_ratings) - np.mean(human_ratings)
return {
'correlation': correlation,
'agreement_rate': agreement_rate,
'bias': bias,
'human_mean': np.mean(human_ratings),
'judge_mean': np.mean(judge_ratings),
'n_examples': len(self.calibration_data)
}A/B Testing Infrastructure
Experiment Manager
from dataclasses import dataclass
import hashlib
@dataclass
class Experiment:
name: str
description: str
control_config: dict
treatment_config: dict
allocation_percentage: float
metrics: list[str]
start_time: datetime
end_time: datetime = None
class ExperimentManager:
def __init__(self, storage):
self.storage = storage
self.active_experiments = {}
def create_experiment(
self,
name: str,
description: str,
control_config: dict,
treatment_config: dict,
allocation_percentage: float = 50.0,
metrics: list[str] = None
) -> Experiment:
"""Create a new experiment."""
experiment = Experiment(
name=name,
description=description,
control_config=control_config,
treatment_config=treatment_config,
allocation_percentage=allocation_percentage,
metrics=metrics or ['quality_score', 'latency', 'user_satisfaction'],
start_time=datetime.now()
)
self.active_experiments[name] = experiment
self.storage.save_experiment(experiment)
return experiment
def get_variant(self, experiment_name: str, user_id: str) -> str:
"""Deterministically assign user to variant."""
experiment = self.active_experiments.get(experiment_name)
if not experiment:
return 'control'
hash_input = f"{experiment_name}:{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
percentage = (hash_value % 100)
if percentage < experiment.allocation_percentage:
return 'treatment'
return 'control'
def get_config(self, experiment_name: str, user_id: str) -> dict:
"""Get configuration for user's variant."""
variant = self.get_variant(experiment_name, user_id)
experiment = self.active_experiments[experiment_name]
return experiment.treatment_config if variant == 'treatment' else experiment.control_configStatistical Analysis
from scipy import stats
class ExperimentAnalyzer:
def __init__(self, storage):
self.storage = storage
def analyze_experiment(
self,
experiment_name: str,
metric_name: str,
min_samples: int = 100
) -> dict:
"""Analyze experiment results with statistical tests."""
control_data = self.storage.get_metric_values(experiment_name, 'control', metric_name)
treatment_data = self.storage.get_metric_values(experiment_name, 'treatment', metric_name)
if len(control_data) < min_samples or len(treatment_data) < min_samples:
return {'status': 'insufficient_data', 'control_n': len(control_data), 'treatment_n': len(treatment_data)}
control_mean = np.mean(control_data)
treatment_mean = np.mean(treatment_data)
lift = (treatment_mean - control_mean) / control_mean if control_mean != 0 else 0
t_stat, p_value = stats.ttest_ind(control_data, treatment_data)
pooled_se = np.sqrt(np.var(control_data)/len(control_data) + np.var(treatment_data)/len(treatment_data))
ci_95 = (
(treatment_mean - control_mean) - 1.96 * pooled_se,
(treatment_mean - control_mean) + 1.96 * pooled_se
)
return {
'status': 'complete',
'control': {'n': len(control_data), 'mean': control_mean, 'std': np.std(control_data)},
'treatment': {'n': len(treatment_data), 'mean': treatment_mean, 'std': np.std(treatment_data)},
'lift': lift,
'lift_percentage': lift * 100,
'p_value': p_value,
'significant': p_value < 0.05,
'ci_95': ci_95
}Model Comparison
@dataclass
class ModelProfile:
name: str
provider: str
quality_scores: dict[str, float]
operational_metrics: dict[str, float]
cost_per_1k_tokens: float
context_window: int
supports_function_calling: bool
supports_vision: bool
class ModelComparator:
"""Compare models across multiple dimensions."""
def __init__(self, test_suite, judge):
self.test_suite = test_suite
self.judge = judge
def profile_model(self, model, model_name: str, test_prompts: list[str]) -> ModelProfile:
"""Generate comprehensive profile for a model."""
quality_results = []
latencies = []
for prompt in test_prompts:
start = time.time()
response = model.generate(prompt)
latency = (time.time() - start) * 1000
quality = self.judge.evaluate(prompt, response)
quality_results.append(quality.rating)
latencies.append(latency)
return ModelProfile(
name=model_name,
provider=model.provider,
quality_scores={
'mean': np.mean(quality_results),
'std': np.std(quality_results),
'p25': np.percentile(quality_results, 25),
'p75': np.percentile(quality_results, 75)
},
operational_metrics={
'latency_p50': np.percentile(latencies, 50),
'latency_p95': np.percentile(latencies, 95),
'latency_p99': np.percentile(latencies, 99)
},
cost_per_1k_tokens=model.cost_per_1k_tokens,
context_window=model.context_window,
supports_function_calling=model.supports_function_calling,
supports_vision=model.supports_vision
)
def compare_models(
self,
models: list[tuple],
test_prompts: list[str],
weights: dict[str, float] = None
) -> dict:
"""Compare multiple models and rank them."""
weights = weights or {'quality': 0.4, 'latency': 0.2, 'cost': 0.3, 'features': 0.1}
profiles = [self.profile_model(model, name, test_prompts) for model, name in models]
scores = []
for profile in profiles:
score = (
weights['quality'] * self._normalize_quality(profile, profiles) +
weights['latency'] * self._normalize_latency(profile, profiles) +
weights['cost'] * self._normalize_cost(profile, profiles) +
weights['features'] * self._score_features(profile)
)
scores.append((profile.name, score, profile))
ranked = sorted(scores, key=lambda x: x[1], reverse=True)
return {
'ranking': [(name, score) for name, score, _ in ranked],
'profiles': {name: profile for name, _, profile in ranked},
'recommendation': ranked[0][0]
}Production Monitoring
Quality Monitor
class QualityMonitor:
"""Monitor production quality metrics."""
def __init__(self, judge, storage, alert_thresholds: dict):
self.judge = judge
self.storage = storage
self.thresholds = alert_thresholds
async def sample_and_evaluate(self, sample_size: int = 100) -> dict:
"""Sample recent production traffic and evaluate."""
samples = await self.storage.get_recent_samples(sample_size)
results = []
for sample in samples:
score = await self.judge.evaluate(
prompt=sample['prompt'],
response=sample['response'],
context=sample.get('context')
)
results.append({
'sample_id': sample['id'],
'score': score.rating,
'timestamp': sample['timestamp']
})
metrics = {
'mean_score': np.mean([r['score'] for r in results]),
'p25_score': np.percentile([r['score'] for r in results], 25),
'below_threshold': sum(1 for r in results if r['score'] < self.thresholds['min_score']),
'sample_size': len(results)
}
# Check alerts
alerts = []
if metrics['mean_score'] < self.thresholds['alert_mean']:
alerts.append(f"Mean score {metrics['mean_score']:.2f} below threshold {self.thresholds['alert_mean']}")
if metrics['below_threshold'] / len(results) > self.thresholds['alert_failure_rate']:
alerts.append(f"Failure rate {metrics['below_threshold']/len(results):.1%} above threshold")
return {'metrics': metrics, 'alerts': alerts, 'results': results}Drift Detection
class DriftDetector:
"""Detect distribution drift in model inputs/outputs."""
def __init__(self, baseline_embeddings, embedding_model):
self.baseline = baseline_embeddings
self.embed = embedding_model
def check_drift(self, recent_texts: list[str], threshold: float = 0.1) -> dict:
"""Check if recent texts have drifted from baseline."""
recent_embeddings = self.embed.encode(recent_texts)
baseline_centroid = np.mean(self.baseline, axis=0)
recent_centroid = np.mean(recent_embeddings, axis=0)
drift_distance = np.linalg.norm(recent_centroid - baseline_centroid)
drift_detected = drift_distance > threshold
return {
'drift_distance': drift_distance,
'threshold': threshold,
'drift_detected': drift_detected,
'recommendation': 'Review recent queries for distribution shift' if drift_detected else 'No action needed'
}