MLOps & Evaluation - Complete Code Reference

This file contains full implementations referenced from Chapter 51. The chapter contains conceptual explanations and condensed examples; this file has production-ready code.

Table of Contents

  1. Evaluation Framework
  2. LLM-as-Judge Patterns
  3. A/B Testing Infrastructure
  4. Model Comparison
  5. Production Monitoring

Evaluation Framework

Base Evaluator Classes

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
from datetime import datetime
import json
import numpy as np

@dataclass
class EvaluationResult:
    example_id: str
    input: str
    expected: Any
    actual: Any
    metrics: dict[str, float]
    metadata: dict[str, Any]

@dataclass
class EvaluationReport:
    run_id: str
    timestamp: datetime
    model_version: str
    dataset_name: str
    num_examples: int
    aggregate_metrics: dict[str, float]
    per_example_results: list[EvaluationResult]
    metadata: dict[str, Any]

class Evaluator(ABC):
    """Base class for evaluators."""

    @abstractmethod
    def evaluate(self, expected: Any, actual: Any) -> dict[str, float]:
        """Evaluate a single example."""
        pass

    @abstractmethod
    def aggregate(self, results: list[dict[str, float]]) -> dict[str, float]:
        """Aggregate results across examples."""
        pass

class ClassificationEvaluator(Evaluator):
    def __init__(self, labels: list[str]):
        self.labels = labels

    def evaluate(self, expected: str, actual: str) -> dict[str, float]:
        return {
            'correct': float(expected == actual),
            'expected_label': expected,
            'predicted_label': actual
        }

    def aggregate(self, results: list[dict[str, float]]) -> dict[str, float]:
        correct = sum(r['correct'] for r in results)
        total = len(results)

        # Per-label metrics
        label_correct = {label: 0 for label in self.labels}
        label_total = {label: 0 for label in self.labels}
        label_predicted = {label: 0 for label in self.labels}

        for r in results:
            expected = r['expected_label']
            predicted = r['predicted_label']
            label_total[expected] += 1
            label_predicted[predicted] += 1
            if r['correct']:
                label_correct[expected] += 1

        metrics = {
            'accuracy': correct / total,
            'num_correct': correct,
            'num_total': total
        }

        for label in self.labels:
            if label_total[label] > 0:
                metrics[f'recall_{label}'] = label_correct[label] / label_total[label]
            if label_predicted[label] > 0:
                metrics[f'precision_{label}'] = label_correct[label] / label_predicted[label]

        return metrics

Evaluation Pipeline

class EvaluationPipeline:
    def __init__(
        self,
        model,
        evaluators: dict[str, Evaluator],
        results_store
    ):
        self.model = model
        self.evaluators = evaluators
        self.results_store = results_store

    def run(
        self,
        dataset: list[dict],
        model_version: str,
        dataset_name: str
    ) -> EvaluationReport:
        """Run evaluation pipeline on dataset."""
        run_id = f"{model_version}_{dataset_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        results = []
        for example in dataset:
            # Generate model output
            actual = self.model.generate(example['input'])

            # Run all evaluators
            example_metrics = {}
            for name, evaluator in self.evaluators.items():
                metrics = evaluator.evaluate(example['expected'], actual)
                example_metrics.update({f"{name}_{k}": v for k, v in metrics.items()})

            results.append(EvaluationResult(
                example_id=example['id'],
                input=example['input'],
                expected=example['expected'],
                actual=actual,
                metrics=example_metrics,
                metadata=example.get('metadata', {})
            ))

        # Aggregate metrics
        aggregate = {}
        for name, evaluator in self.evaluators.items():
            evaluator_results = [
                {k.replace(f"{name}_", ""): v for k, v in r.metrics.items() if k.startswith(f"{name}_")}
                for r in results
            ]
            agg = evaluator.aggregate(evaluator_results)
            aggregate.update({f"{name}_{k}": v for k, v in agg.items()})

        report = EvaluationReport(
            run_id=run_id,
            timestamp=datetime.now(),
            model_version=model_version,
            dataset_name=dataset_name,
            num_examples=len(dataset),
            aggregate_metrics=aggregate,
            per_example_results=results,
            metadata={'evaluators': list(self.evaluators.keys())}
        )

        # Store results
        self.results_store.save(report)

        return report

Dataset Management

from typing import Iterator
import random

class EvaluationDataset:
    """Manages evaluation datasets with versioning and sampling."""

    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.datasets = {}

    def register_dataset(
        self,
        name: str,
        examples: list[dict],
        version: str,
        metadata: dict = None
    ):
        """Register a new dataset version."""
        dataset_id = f"{name}_v{version}"
        self.datasets[dataset_id] = {
            'name': name,
            'version': version,
            'examples': examples,
            'metadata': metadata or {},
            'created_at': datetime.now().isoformat()
        }
        self._save_dataset(dataset_id)

    def get_dataset(self, name: str, version: str = 'latest') -> list[dict]:
        """Retrieve dataset by name and version."""
        if version == 'latest':
            matching = [k for k in self.datasets if k.startswith(f"{name}_v")]
            if not matching:
                raise ValueError(f"No dataset found with name: {name}")
            dataset_id = sorted(matching)[-1]
        else:
            dataset_id = f"{name}_v{version}"

        return self.datasets[dataset_id]['examples']

    def sample_dataset(
        self,
        name: str,
        n: int,
        stratify_by: str = None,
        seed: int = 42
    ) -> list[dict]:
        """Sample n examples from dataset with optional stratification."""
        examples = self.get_dataset(name)
        random.seed(seed)

        if stratify_by:
            groups = {}
            for ex in examples:
                key = ex.get(stratify_by, 'unknown')
                groups.setdefault(key, []).append(ex)

            sampled = []
            for group_examples in groups.values():
                group_n = max(1, int(n * len(group_examples) / len(examples)))
                sampled.extend(random.sample(group_examples, min(group_n, len(group_examples))))

            return sampled[:n]
        else:
            return random.sample(examples, min(n, len(examples)))

LLM-as-Judge Patterns

Basic Judge

from enum import Enum
from pydantic import BaseModel

class Rating(Enum):
    EXCELLENT = 5
    GOOD = 4
    ACCEPTABLE = 3
    POOR = 2
    UNACCEPTABLE = 1

class JudgeResult(BaseModel):
    rating: int
    reasoning: str
    aspects: dict[str, int]

class LLMJudge:
    def __init__(self, judge_model, criteria: list[str]):
        self.model = judge_model
        self.criteria = criteria

    def evaluate(
        self,
        prompt: str,
        response: str,
        reference: str = None
    ) -> JudgeResult:
        """Evaluate a response using LLM-as-judge."""
        judge_prompt = self._build_judge_prompt(prompt, response, reference)
        result = self.model.generate(
            judge_prompt,
            response_format={"type": "json_object"}
        )
        return JudgeResult.model_validate_json(result)

    def _build_judge_prompt(self, prompt: str, response: str, reference: str = None) -> str:
        criteria_text = "\n".join(f"- {c}" for c in self.criteria)
        reference_section = f"\nREFERENCE ANSWER:\n{reference}\n" if reference else ""

        return f"""You are an expert evaluator. Evaluate the following response.

ORIGINAL PROMPT:
{prompt}

RESPONSE TO EVALUATE:
{response}
{reference_section}
EVALUATION CRITERIA:
{criteria_text}

Rate 1-5 scale: 1=Unacceptable, 2=Poor, 3=Acceptable, 4=Good, 5=Excellent

Return JSON: {{"rating": <1-5>, "reasoning": "<explanation>", "aspects": {{"<criterion>": <1-5>, ...}}}}

Be critical and consistent. Only give 5s for truly excellent responses."""

Pairwise Comparison Judge

class PairwiseJudge:
    def __init__(self, judge_model):
        self.model = judge_model

    def compare(
        self,
        prompt: str,
        response_a: str,
        response_b: str,
        criteria: list[str]
    ) -> dict:
        """Compare two responses, controlling for position bias."""
        # Run comparison in both orders
        result_ab = self._compare_once(prompt, response_a, response_b, criteria)
        result_ba = self._compare_once(prompt, response_b, response_a, criteria)

        # Reconcile results
        if result_ab['winner'] == 'A' and result_ba['winner'] == 'B':
            return {'winner': 'A', 'confidence': 'high', 'reasoning': result_ab['reasoning']}
        elif result_ab['winner'] == 'B' and result_ba['winner'] == 'A':
            return {'winner': 'B', 'confidence': 'high', 'reasoning': result_ab['reasoning']}
        elif result_ab['winner'] == result_ba['winner']:
            return {'winner': result_ab['winner'], 'confidence': 'high', 'reasoning': result_ab['reasoning']}
        else:
            return {'winner': 'tie', 'confidence': 'low', 'reasoning': 'Inconsistent results'}

    def _compare_once(self, prompt: str, response_a: str, response_b: str, criteria: list[str]) -> dict:
        judge_prompt = f"""Compare these two responses.

PROMPT: {prompt}

RESPONSE A:
{response_a}

RESPONSE B:
{response_b}

CRITERIA: {', '.join(criteria)}

Return JSON: {{"winner": "A" or "B" or "tie", "reasoning": "<explanation>"}}"""

        result = self.model.generate(judge_prompt, response_format={"type": "json_object"})
        return json.loads(result)

Multi-Aspect Judge

class MultiAspectJudge:
    """Evaluate multiple aspects independently."""

    ASPECT_PROMPTS = {
        'correctness': """Evaluate ONLY factual correctness. Rate 1-5.""",
        'helpfulness': """Evaluate ONLY how helpful this is. Rate 1-5.""",
        'safety': """Evaluate ONLY safety (no harmful content). Rate 1-5.""",
        'coherence': """Evaluate ONLY clarity and organization. Rate 1-5.""",
        'groundedness': """Evaluate ONLY whether claims are grounded in context. Rate 1-5."""
    }

    def __init__(self, judge_model):
        self.model = judge_model

    def evaluate(
        self,
        prompt: str,
        response: str,
        context: str = None,
        aspects: list[str] = None
    ) -> dict[str, dict]:
        """Evaluate multiple aspects independently."""
        aspects = aspects or list(self.ASPECT_PROMPTS.keys())
        results = {}

        for aspect in aspects:
            if aspect not in self.ASPECT_PROMPTS:
                continue

            aspect_prompt = f"""{self.ASPECT_PROMPTS[aspect]}

PROMPT: {prompt}
{"CONTEXT: " + context if context else ""}
RESPONSE: {response}

Return JSON: {{"rating": <1-5>, "reasoning": "<brief explanation>"}}"""

            result = self.model.generate(aspect_prompt, response_format={"type": "json_object"})
            results[aspect] = json.loads(result)

        ratings = [r['rating'] for r in results.values()]
        results['overall'] = {'rating': sum(ratings) / len(ratings)}

        return results

Judge Calibration

class JudgeCalibrator:
    """Calibrate LLM judge against human ratings."""

    def __init__(self, judge: LLMJudge):
        self.judge = judge
        self.calibration_data = []

    def add_calibration_example(
        self,
        prompt: str,
        response: str,
        human_rating: float,
        human_reasoning: str = None
    ):
        """Add human-rated example for calibration."""
        self.calibration_data.append({
            'prompt': prompt,
            'response': response,
            'human_rating': human_rating,
            'human_reasoning': human_reasoning
        })

    def evaluate_calibration(self) -> dict:
        """Evaluate judge agreement with human ratings."""
        human_ratings = []
        judge_ratings = []

        for example in self.calibration_data:
            result = self.judge.evaluate(example['prompt'], example['response'])
            human_ratings.append(example['human_rating'])
            judge_ratings.append(result.rating)

        correlation = np.corrcoef(human_ratings, judge_ratings)[0, 1]
        agreements = sum(1 for h, j in zip(human_ratings, judge_ratings) if abs(h - j) <= 1)
        agreement_rate = agreements / len(human_ratings)
        bias = np.mean(judge_ratings) - np.mean(human_ratings)

        return {
            'correlation': correlation,
            'agreement_rate': agreement_rate,
            'bias': bias,
            'human_mean': np.mean(human_ratings),
            'judge_mean': np.mean(judge_ratings),
            'n_examples': len(self.calibration_data)
        }

A/B Testing Infrastructure

Experiment Manager

from dataclasses import dataclass
import hashlib

@dataclass
class Experiment:
    name: str
    description: str
    control_config: dict
    treatment_config: dict
    allocation_percentage: float
    metrics: list[str]
    start_time: datetime
    end_time: datetime = None

class ExperimentManager:
    def __init__(self, storage):
        self.storage = storage
        self.active_experiments = {}

    def create_experiment(
        self,
        name: str,
        description: str,
        control_config: dict,
        treatment_config: dict,
        allocation_percentage: float = 50.0,
        metrics: list[str] = None
    ) -> Experiment:
        """Create a new experiment."""
        experiment = Experiment(
            name=name,
            description=description,
            control_config=control_config,
            treatment_config=treatment_config,
            allocation_percentage=allocation_percentage,
            metrics=metrics or ['quality_score', 'latency', 'user_satisfaction'],
            start_time=datetime.now()
        )
        self.active_experiments[name] = experiment
        self.storage.save_experiment(experiment)
        return experiment

    def get_variant(self, experiment_name: str, user_id: str) -> str:
        """Deterministically assign user to variant."""
        experiment = self.active_experiments.get(experiment_name)
        if not experiment:
            return 'control'

        hash_input = f"{experiment_name}:{user_id}"
        hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
        percentage = (hash_value % 100)

        if percentage < experiment.allocation_percentage:
            return 'treatment'
        return 'control'

    def get_config(self, experiment_name: str, user_id: str) -> dict:
        """Get configuration for user's variant."""
        variant = self.get_variant(experiment_name, user_id)
        experiment = self.active_experiments[experiment_name]
        return experiment.treatment_config if variant == 'treatment' else experiment.control_config

Statistical Analysis

from scipy import stats

class ExperimentAnalyzer:
    def __init__(self, storage):
        self.storage = storage

    def analyze_experiment(
        self,
        experiment_name: str,
        metric_name: str,
        min_samples: int = 100
    ) -> dict:
        """Analyze experiment results with statistical tests."""
        control_data = self.storage.get_metric_values(experiment_name, 'control', metric_name)
        treatment_data = self.storage.get_metric_values(experiment_name, 'treatment', metric_name)

        if len(control_data) < min_samples or len(treatment_data) < min_samples:
            return {'status': 'insufficient_data', 'control_n': len(control_data), 'treatment_n': len(treatment_data)}

        control_mean = np.mean(control_data)
        treatment_mean = np.mean(treatment_data)
        lift = (treatment_mean - control_mean) / control_mean if control_mean != 0 else 0

        t_stat, p_value = stats.ttest_ind(control_data, treatment_data)

        pooled_se = np.sqrt(np.var(control_data)/len(control_data) + np.var(treatment_data)/len(treatment_data))
        ci_95 = (
            (treatment_mean - control_mean) - 1.96 * pooled_se,
            (treatment_mean - control_mean) + 1.96 * pooled_se
        )

        return {
            'status': 'complete',
            'control': {'n': len(control_data), 'mean': control_mean, 'std': np.std(control_data)},
            'treatment': {'n': len(treatment_data), 'mean': treatment_mean, 'std': np.std(treatment_data)},
            'lift': lift,
            'lift_percentage': lift * 100,
            'p_value': p_value,
            'significant': p_value < 0.05,
            'ci_95': ci_95
        }

Model Comparison

@dataclass
class ModelProfile:
    name: str
    provider: str
    quality_scores: dict[str, float]
    operational_metrics: dict[str, float]
    cost_per_1k_tokens: float
    context_window: int
    supports_function_calling: bool
    supports_vision: bool

class ModelComparator:
    """Compare models across multiple dimensions."""

    def __init__(self, test_suite, judge):
        self.test_suite = test_suite
        self.judge = judge

    def profile_model(self, model, model_name: str, test_prompts: list[str]) -> ModelProfile:
        """Generate comprehensive profile for a model."""
        quality_results = []
        latencies = []

        for prompt in test_prompts:
            start = time.time()
            response = model.generate(prompt)
            latency = (time.time() - start) * 1000

            quality = self.judge.evaluate(prompt, response)
            quality_results.append(quality.rating)
            latencies.append(latency)

        return ModelProfile(
            name=model_name,
            provider=model.provider,
            quality_scores={
                'mean': np.mean(quality_results),
                'std': np.std(quality_results),
                'p25': np.percentile(quality_results, 25),
                'p75': np.percentile(quality_results, 75)
            },
            operational_metrics={
                'latency_p50': np.percentile(latencies, 50),
                'latency_p95': np.percentile(latencies, 95),
                'latency_p99': np.percentile(latencies, 99)
            },
            cost_per_1k_tokens=model.cost_per_1k_tokens,
            context_window=model.context_window,
            supports_function_calling=model.supports_function_calling,
            supports_vision=model.supports_vision
        )

    def compare_models(
        self,
        models: list[tuple],
        test_prompts: list[str],
        weights: dict[str, float] = None
    ) -> dict:
        """Compare multiple models and rank them."""
        weights = weights or {'quality': 0.4, 'latency': 0.2, 'cost': 0.3, 'features': 0.1}

        profiles = [self.profile_model(model, name, test_prompts) for model, name in models]

        scores = []
        for profile in profiles:
            score = (
                weights['quality'] * self._normalize_quality(profile, profiles) +
                weights['latency'] * self._normalize_latency(profile, profiles) +
                weights['cost'] * self._normalize_cost(profile, profiles) +
                weights['features'] * self._score_features(profile)
            )
            scores.append((profile.name, score, profile))

        ranked = sorted(scores, key=lambda x: x[1], reverse=True)

        return {
            'ranking': [(name, score) for name, score, _ in ranked],
            'profiles': {name: profile for name, _, profile in ranked},
            'recommendation': ranked[0][0]
        }

Production Monitoring

Quality Monitor

class QualityMonitor:
    """Monitor production quality metrics."""

    def __init__(self, judge, storage, alert_thresholds: dict):
        self.judge = judge
        self.storage = storage
        self.thresholds = alert_thresholds

    async def sample_and_evaluate(self, sample_size: int = 100) -> dict:
        """Sample recent production traffic and evaluate."""
        samples = await self.storage.get_recent_samples(sample_size)

        results = []
        for sample in samples:
            score = await self.judge.evaluate(
                prompt=sample['prompt'],
                response=sample['response'],
                context=sample.get('context')
            )
            results.append({
                'sample_id': sample['id'],
                'score': score.rating,
                'timestamp': sample['timestamp']
            })

        metrics = {
            'mean_score': np.mean([r['score'] for r in results]),
            'p25_score': np.percentile([r['score'] for r in results], 25),
            'below_threshold': sum(1 for r in results if r['score'] < self.thresholds['min_score']),
            'sample_size': len(results)
        }

        # Check alerts
        alerts = []
        if metrics['mean_score'] < self.thresholds['alert_mean']:
            alerts.append(f"Mean score {metrics['mean_score']:.2f} below threshold {self.thresholds['alert_mean']}")
        if metrics['below_threshold'] / len(results) > self.thresholds['alert_failure_rate']:
            alerts.append(f"Failure rate {metrics['below_threshold']/len(results):.1%} above threshold")

        return {'metrics': metrics, 'alerts': alerts, 'results': results}

Drift Detection

class DriftDetector:
    """Detect distribution drift in model inputs/outputs."""

    def __init__(self, baseline_embeddings, embedding_model):
        self.baseline = baseline_embeddings
        self.embed = embedding_model

    def check_drift(self, recent_texts: list[str], threshold: float = 0.1) -> dict:
        """Check if recent texts have drifted from baseline."""
        recent_embeddings = self.embed.encode(recent_texts)

        baseline_centroid = np.mean(self.baseline, axis=0)
        recent_centroid = np.mean(recent_embeddings, axis=0)

        drift_distance = np.linalg.norm(recent_centroid - baseline_centroid)
        drift_detected = drift_distance > threshold

        return {
            'drift_distance': drift_distance,
            'threshold': threshold,
            'drift_detected': drift_detected,
            'recommendation': 'Review recent queries for distribution shift' if drift_detected else 'No action needed'
        }