Data Architecture Code Reference

Complete code implementations for Chapter 24: Data Architecture for AI.


Feature Store Architecture

flowchart TB
    subgraph Sources["DATA SOURCES"]
        direction LR
        Events["Event Stream<br/>(Kafka)"]
        Batch["Batch Data<br/>(Data Warehouse)"]
        RT["Real-time APIs"]
    end

    subgraph Compute["FEATURE COMPUTATION"]
        direction TB
        Stream["Streaming Engine<br/>(Flink/Spark)"]
        BatchProc["Batch Processing<br/>(Spark/dbt)"]

        subgraph Registry["Feature Registry"]
            Defs["Feature Definitions"]
            Lineage["Lineage Tracking"]
            Governance["Governance Rules"]
        end
    end

    subgraph Storage["FEATURE STORAGE"]
        direction LR

        subgraph Online["Online Store"]
            Redis["Redis/DynamoDB<br/>(Low Latency)"]
        end

        subgraph Offline["Offline Store"]
            Delta["Delta Lake/Parquet<br/>(Historical)"]
        end

        subgraph Vector["Vector Store"]
            Embeddings["Embeddings<br/>(Pinecone/Milvus)"]
        end
    end

    subgraph Serving["SERVING LAYER"]
        direction LR
        OnlineAPI["Online API<br/>< 10ms p99"]
        OfflineAPI["Batch API<br/>Point-in-Time Joins"]
    end

    subgraph Consumers["CONSUMERS"]
        direction LR
        Training["Model Training"]
        Inference["Real-time Inference"]
        Analytics["Analytics/BI"]
    end

    Events --> Stream
    Batch --> BatchProc
    RT --> Stream

    Stream --> Online
    Stream --> Vector
    BatchProc --> Offline
    BatchProc --> Online

    Registry -.->|"Defines"| Stream
    Registry -.->|"Defines"| BatchProc

    Online --> OnlineAPI
    Offline --> OfflineAPI
    Vector --> OnlineAPI

    OnlineAPI --> Inference
    OfflineAPI --> Training
    OfflineAPI --> Analytics

    style Sources fill:#e3f2fd
    style Compute fill:#fff3e0
    style Storage fill:#e8f5e9
    style Serving fill:#f3e5f5

Data Pipeline Architecture

flowchart LR
    subgraph Lambda["LAMBDA ARCHITECTURE"]
        direction TB
        L_Batch["Batch Layer<br/>(Accuracy)"]
        L_Speed["Speed Layer<br/>(Low Latency)"]
        L_Serving["Serving Layer<br/>(Merge Views)"]

        L_Batch --> L_Serving
        L_Speed --> L_Serving
    end

    subgraph Kappa["KAPPA ARCHITECTURE"]
        direction TB
        K_Stream["Stream Processing<br/>(Single Path)"]
        K_Store["Serving Store"]
        K_Replay["Replay for<br/>Corrections"]

        K_Stream --> K_Store
        K_Replay -.-> K_Stream
    end

    Input["Raw Events"] --> Lambda
    Input --> Kappa

    style Lambda fill:#e3f2fd
    style Kappa fill:#e8f5e9

Training-Serving Consistency

flowchart TB
    subgraph Definition["SINGLE DEFINITION"]
        FeatureDef["Feature Definition<br/>(Code + Config)"]
    end

    subgraph Training["TRAINING PATH"]
        T_Historical["Historical<br/>Data"]
        T_PIT["Point-in-Time<br/>Join"]
        T_Features["Training<br/>Features"]
        T_Model["Model<br/>Training"]

        T_Historical --> T_PIT
        T_PIT --> T_Features
        T_Features --> T_Model
    end

    subgraph Serving["SERVING PATH"]
        S_Request["Inference<br/>Request"]
        S_Lookup["Feature<br/>Lookup"]
        S_Features["Serving<br/>Features"]
        S_Predict["Model<br/>Prediction"]

        S_Request --> S_Lookup
        S_Lookup --> S_Features
        S_Features --> S_Predict
    end

    FeatureDef --> T_PIT
    FeatureDef --> S_Lookup

    subgraph Monitoring["SKEW DETECTION"]
        Compare["Distribution<br/>Comparison"]
        Alert["Drift<br/>Alerts"]
        Compare --> Alert
    end

    T_Features -.-> Compare
    S_Features -.-> Compare

    style Definition fill:#fff3e0
    style Training fill:#e3f2fd
    style Serving fill:#e8f5e9
    style Monitoring fill:#ffebee

Table of Contents


Data Pipeline Architecture

Architecture Selector

Complete implementation for choosing between Lambda and Kappa architectures:

class ArchitectureSelector:
    """Help choose between Lambda and Kappa architectures."""

    def recommend(self, requirements: dict) -> dict:
        """Recommend architecture based on requirements."""
        score_lambda = 0
        score_kappa = 0
        reasons = {'lambda': [], 'kappa': []}

        # Evaluate factors
        if requirements.get('complex_batch_processing'):
            score_lambda += 2
            reasons['lambda'].append(
                "Complex batch processing better in dedicated batch layer"
            )

        if requirements.get('maintain_single_codebase'):
            score_kappa += 2
            reasons['kappa'].append(
                "Single codebase simpler to maintain"
            )

        if requirements.get('historical_accuracy_critical'):
            score_lambda += 1
            reasons['lambda'].append(
                "Batch layer ensures accurate historical computation"
            )

        if requirements.get('reprocessing_frequent'):
            score_kappa += 2
            reasons['kappa'].append(
                "Replay-based reprocessing is core strength"
            )

        if requirements.get('latency_requirements') == 'sub_second':
            score_kappa += 1
            reasons['kappa'].append(
                "Pure streaming often achieves lower latency"
            )

        if requirements.get('team_experience') == 'batch_heavy':
            score_lambda += 1
            reasons['lambda'].append(
                "Team can leverage existing batch expertise"
            )

        recommendation = 'lambda' if score_lambda > score_kappa else 'kappa'

        return {
            'recommendation': recommendation,
            'scores': {'lambda': score_lambda, 'kappa': score_kappa},
            'reasons': reasons[recommendation],
            'tradeoffs': self._describe_tradeoffs(recommendation)
        }

    def _describe_tradeoffs(self, recommendation: str) -> list[str]:
        if recommendation == 'lambda':
            return [
                "Pro: Best-of-both-worlds accuracy and speed",
                "Con: Two codebases to maintain",
                "Con: More complex deployment and operations",
                "Con: Potential batch/speed layer inconsistencies"
            ]
        else:
            return [
                "Pro: Single codebase and processing paradigm",
                "Pro: Simpler mental model and debugging",
                "Con: Batch-like operations less natural",
                "Con: Replay for corrections can be expensive"
            ]

Schema Registry

Complete implementation for central schema registry with compatibility checking:

from dataclasses import dataclass
from enum import Enum
from typing import Optional
import json

class SchemaCompatibility(Enum):
    BACKWARD = "backward"  # New schema can read old data
    FORWARD = "forward"    # Old schema can read new data
    FULL = "full"          # Both backward and forward
    NONE = "none"          # No compatibility guarantee

@dataclass
class SchemaField:
    """A field in a data schema."""
    name: str
    type: str
    nullable: bool = True
    default: Optional[str] = None
    description: str = ""

@dataclass
class DataSchema:
    """Data schema with version tracking."""
    name: str
    version: int
    fields: list[SchemaField]
    compatibility: SchemaCompatibility

    def to_json_schema(self) -> dict:
        """Convert to JSON Schema format."""
        properties = {}
        required = []

        for field in self.fields:
            properties[field.name] = {
                'type': self._map_type(field.type),
                'description': field.description
            }
            if field.default is not None:
                properties[field.name]['default'] = field.default
            if not field.nullable:
                required.append(field.name)

        return {
            'type': 'object',
            'properties': properties,
            'required': required,
            '$schema': 'http://json-schema.org/draft-07/schema#'
        }

    def _map_type(self, dtype: str) -> str:
        mapping = {
            'string': 'string',
            'int': 'integer',
            'float': 'number',
            'bool': 'boolean',
            'array': 'array',
            'object': 'object'
        }
        return mapping.get(dtype, 'string')

class SchemaRegistry:
    """Central schema registry for data contracts."""

    def __init__(self):
        self.schemas: dict[str, list[DataSchema]] = {}

    def register(self, schema: DataSchema) -> dict:
        """Register a new schema version."""
        name = schema.name

        if name not in self.schemas:
            self.schemas[name] = []
            self.schemas[name].append(schema)
            return {'status': 'registered', 'version': schema.version}

        # Check compatibility with previous version
        previous = self.schemas[name][-1]
        compatibility_check = self._check_compatibility(previous, schema)

        if not compatibility_check['compatible']:
            return {
                'status': 'rejected',
                'reason': compatibility_check['violations']
            }

        self.schemas[name].append(schema)
        return {'status': 'registered', 'version': schema.version}

    def _check_compatibility(
        self,
        old: DataSchema,
        new: DataSchema
    ) -> dict:
        """Check if new schema is compatible with old."""
        violations = []

        old_fields = {f.name: f for f in old.fields}
        new_fields = {f.name: f for f in new.fields}

        # Check based on compatibility mode
        if new.compatibility in [
            SchemaCompatibility.BACKWARD,
            SchemaCompatibility.FULL
        ]:
            # New schema must be able to read old data
            # Old fields must exist in new schema or have defaults
            for name, old_field in old_fields.items():
                if name not in new_fields:
                    violations.append(
                        f"Field '{name}' removed without default"
                    )

        if new.compatibility in [
            SchemaCompatibility.FORWARD,
            SchemaCompatibility.FULL
        ]:
            # Old schema must be able to read new data
            # New fields must be nullable or have defaults
            for name, new_field in new_fields.items():
                if name not in old_fields:
                    if not new_field.nullable and new_field.default is None:
                        violations.append(
                            f"New field '{name}' is not nullable and has no default"
                        )

        return {
            'compatible': len(violations) == 0,
            'violations': violations
        }

    def get_latest(self, name: str) -> Optional[DataSchema]:
        """Get latest schema version."""
        if name in self.schemas and self.schemas[name]:
            return self.schemas[name][-1]
        return None

    def get_version(self, name: str, version: int) -> Optional[DataSchema]:
        """Get specific schema version."""
        if name in self.schemas:
            for schema in self.schemas[name]:
                if schema.version == version:
                    return schema
        return None

Contract Enforcer

Complete implementation for enforcing data contracts:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class SLA:
    """Service Level Agreement for data."""
    freshness_seconds: int  # Max age of data
    availability_percent: float  # e.g., 99.9
    completeness_percent: float  # Minimum row count vs expected

@dataclass
class DataContract:
    """Contract between data producer and consumer."""
    name: str
    producer: str
    consumers: list[str]
    schema: DataSchema
    sla: SLA
    quality_checks: list[str]
    owner: str
    documentation_url: str

class ContractEnforcer:
    """Enforce data contracts."""

    def __init__(self, contracts: list[DataContract]):
        self.contracts = {c.name: c for c in contracts}
        self.violations: list[dict] = []

    def validate_write(
        self,
        contract_name: str,
        data,
        producer_id: str
    ) -> dict:
        """Validate data before allowing write."""
        contract = self.contracts.get(contract_name)
        if not contract:
            return {'valid': False, 'error': 'Unknown contract'}

        # Check producer authorization
        if producer_id != contract.producer:
            return {'valid': False, 'error': 'Unauthorized producer'}

        # Schema validation
        schema_result = self._validate_schema(data, contract.schema)
        if not schema_result['valid']:
            return schema_result

        # Quality checks
        quality_result = self._run_quality_checks(data, contract)
        if not quality_result['passed']:
            self._record_violation(contract_name, 'quality', quality_result)
            return {
                'valid': False,
                'error': 'Quality checks failed',
                'details': quality_result
            }

        return {'valid': True}

    def _validate_schema(self, data, schema: DataSchema) -> dict:
        """Validate data against schema."""
        # Implementation would use JSON Schema validation
        return {'valid': True}

    def _run_quality_checks(
        self,
        data,
        contract: DataContract
    ) -> dict:
        """Run quality checks specified in contract."""
        results = []

        for check_name in contract.quality_checks:
            # Would dispatch to actual check implementations
            results.append({'check': check_name, 'passed': True})

        return {
            'passed': all(r['passed'] for r in results),
            'results': results
        }

    def _record_violation(
        self,
        contract: str,
        violation_type: str,
        details: dict
    ):
        """Record contract violation for alerting."""
        self.violations.append({
            'contract': contract,
            'type': violation_type,
            'details': details,
            'timestamp': datetime.now()
        })

    def check_sla_compliance(self, contract_name: str) -> dict:
        """Check if SLA is being met."""
        contract = self.contracts.get(contract_name)
        if not contract:
            return {'error': 'Unknown contract'}

        # Would query metrics system for actual compliance
        return {
            'contract': contract_name,
            'freshness_met': True,
            'availability_met': True,
            'completeness_met': True,
            'overall_compliant': True
        }

Feature Stores

Feature Store Interface

Complete implementation for feature store with online/offline serving:

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional

@dataclass
class Feature:
    """Feature definition."""
    name: str
    entity: str  # user, item, session, etc.
    value_type: str  # int, float, string, embedding
    description: str
    owner: str
    created: datetime
    computation: str  # SQL, Python, streaming

@dataclass
class FeatureView:
    """Collection of features with shared entity and timing."""
    name: str
    entities: list[str]
    features: list[Feature]
    ttl: Optional[int]  # Time-to-live in seconds
    online: bool  # Available for real-time serving
    offline: bool  # Available for training

class FeatureStore:
    """Feature store interface."""

    def __init__(self, online_store, offline_store, registry):
        self.online = online_store
        self.offline = offline_store
        self.registry = registry

    # Online serving
    async def get_online_features(
        self,
        feature_refs: list[str],
        entity_keys: dict[str, Any]
    ) -> dict[str, Any]:
        """Get features for real-time inference."""
        features = {}

        for ref in feature_refs:
            view_name, feature_name = ref.split(':')
            value = await self.online.get(
                view=view_name,
                feature=feature_name,
                keys=entity_keys
            )
            features[ref] = value

        return features

    # Offline training
    def get_historical_features(
        self,
        feature_refs: list[str],
        entity_df,  # DataFrame with entity keys and event_timestamp
    ):
        """Get point-in-time correct features for training."""
        # Perform point-in-time join
        result = self.offline.point_in_time_join(
            feature_refs=feature_refs,
            entity_df=entity_df,
            ttl=self._get_ttl(feature_refs)
        )
        return result

    def _get_ttl(self, feature_refs: list[str]) -> dict:
        """Get TTL for each feature view."""
        ttls = {}
        for ref in feature_refs:
            view_name = ref.split(':')[0]
            view = self.registry.get_view(view_name)
            ttls[view_name] = view.ttl
        return ttls

Feature Store Selector Tool

Complete implementation for selecting the right feature store platform:

from dataclasses import dataclass

@dataclass
class FeatureStorePlatform:
    """Feature store platform comparison."""
    name: str
    type: str  # open_source, managed, embedded
    strengths: list[str]
    limitations: list[str]
    best_for: str
    real_time_support: str
    llm_integration: str

FEATURE_STORE_PLATFORMS = [
    FeatureStorePlatform(
        name="Feast",
        type="open_source",
        strengths=[
            "Cloud-agnostic and self-hosted",
            "Active community and ecosystem",
            "Flexible online/offline store backends",
            "Strong Python SDK"
        ],
        limitations=[
            "Requires infrastructure management",
            "Less built-in streaming support",
            "Feature computation is external"
        ],
        best_for="Teams wanting control and flexibility",
        real_time_support="Via external streaming (Spark, Flink)",
        llm_integration="Embedding storage, RAG features"
    ),
    FeatureStorePlatform(
        name="Tecton",
        type="managed",
        strengths=[
            "Enterprise-grade managed service",
            "Built-in streaming feature computation",
            "Strong governance and monitoring",
            "Declarative feature definitions"
        ],
        limitations=[
            "Higher cost for smaller teams",
            "Vendor lock-in considerations",
            "Less customization flexibility"
        ],
        best_for="Large enterprises with complex requirements",
        real_time_support="Native streaming with exactly-once",
        llm_integration="Vector embeddings, prompt caching"
    ),
    FeatureStorePlatform(
        name="Databricks Feature Store",
        type="embedded",
        strengths=[
            "Tight Unity Catalog integration",
            "Seamless Spark/Delta Lake workflow",
            "Built-in MLflow integration",
            "Automatic feature lineage"
        ],
        limitations=[
            "Requires Databricks ecosystem",
            "Less flexible for non-Spark workloads"
        ],
        best_for="Teams already on Databricks",
        real_time_support="Via Spark Structured Streaming",
        llm_integration="Embedding computation in Spark"
    ),
    FeatureStorePlatform(
        name="SageMaker Feature Store",
        type="managed",
        strengths=[
            "Deep AWS integration",
            "Online/offline with single API",
            "Built-in security and compliance",
            "Pay-per-use pricing"
        ],
        limitations=[
            "AWS-only",
            "Less flexible transformation options",
            "Learning curve for non-SageMaker users"
        ],
        best_for="AWS-native ML teams",
        real_time_support="Kinesis integration",
        llm_integration="Bedrock feature serving"
    ),
    FeatureStorePlatform(
        name="Vertex AI Feature Store",
        type="managed",
        strengths=[
            "GCP-native with BigQuery integration",
            "Strong embedding and vector support",
            "Built-in feature monitoring",
            "Bigtable online serving"
        ],
        limitations=[
            "GCP-only",
            "Pricing can be complex",
            "Less batch-oriented flexibility"
        ],
        best_for="GCP-native teams, especially with BigQuery",
        real_time_support="Dataflow streaming",
        llm_integration="Native vector search, Vertex AI"
    ),
]

class FeatureStoreSelectorTool:
    """Help teams select the right feature store."""

    def recommend(self, requirements: dict) -> dict:
        """Recommend feature store based on requirements."""
        scores = {p.name: 0 for p in FEATURE_STORE_PLATFORMS}

        # Score based on requirements
        cloud = requirements.get('cloud')
        if cloud == 'aws':
            scores['SageMaker Feature Store'] += 3
        elif cloud == 'gcp':
            scores['Vertex AI Feature Store'] += 3
        elif cloud == 'multi' or cloud == 'on_prem':
            scores['Feast'] += 3

        if requirements.get('databricks_existing'):
            scores['Databricks Feature Store'] += 4

        if requirements.get('streaming_critical'):
            scores['Tecton'] += 3
            scores['Databricks Feature Store'] += 2

        if requirements.get('budget') == 'limited':
            scores['Feast'] += 3

        if requirements.get('enterprise_governance'):
            scores['Tecton'] += 3
            scores['SageMaker Feature Store'] += 2

        if requirements.get('llm_heavy'):
            scores['Vertex AI Feature Store'] += 2
            scores['Tecton'] += 1

        # Find winner
        winner = max(scores.items(), key=lambda x: x[1])

        return {
            'recommendation': winner[0],
            'scores': scores,
            'rationale': self._get_rationale(winner[0], requirements)
        }

    def _get_rationale(self, platform: str, requirements: dict) -> str:
        rationales = {
            'Feast': "Open-source flexibility and cloud-agnostic deployment",
            'Tecton': "Enterprise features and native streaming capabilities",
            'Databricks Feature Store': "Existing Databricks investment and Spark workflows",
            'SageMaker Feature Store': "AWS-native with deep service integration",
            'Vertex AI Feature Store': "GCP-native with strong vector/embedding support"
        }
        return rationales.get(platform, "")

Feature Engineerer

Complete implementation for feature engineering utilities:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, Optional

class FeaturePattern(Enum):
    AGGREGATION = "aggregation"       # Count, sum, avg over windows
    EMBEDDING = "embedding"           # Dense vector representations
    CROSS_FEATURE = "cross_feature"   # Interactions between features
    TEMPORAL = "temporal"             # Time-based features
    BEHAVIORAL = "behavioral"         # User/entity behavior patterns
    CONTEXTUAL = "contextual"         # Environmental/situational

class FeatureEngineerer:
    """Feature engineering utilities."""

    def compute_windowed_aggregation(
        self,
        events,
        entity_key: str,
        value_col: str,
        aggregation: str,
        window_seconds: int,
        current_time: datetime
    ) -> dict:
        """Compute windowed aggregation feature."""
        # Filter events within window
        window_start = current_time - timedelta(seconds=window_seconds)
        windowed = [
            e for e in events
            if e['timestamp'] >= window_start
            and e['timestamp'] < current_time
        ]

        if not windowed:
            return {entity_key: None}

        values = [e[value_col] for e in windowed if value_col in e]

        if aggregation == 'count':
            return {entity_key: len(values)}
        elif aggregation == 'sum':
            return {entity_key: sum(values)}
        elif aggregation == 'avg':
            return {entity_key: sum(values) / len(values) if values else None}
        elif aggregation == 'max':
            return {entity_key: max(values) if values else None}
        elif aggregation == 'min':
            return {entity_key: min(values) if values else None}
        else:
            raise ValueError(f"Unknown aggregation: {aggregation}")

    def encode_cyclical_time(
        self,
        timestamp: datetime,
        period: str
    ) -> tuple[float, float]:
        """Encode cyclical time features using sin/cos."""
        import math

        if period == 'hour_of_day':
            value = timestamp.hour
            max_val = 24
        elif period == 'day_of_week':
            value = timestamp.weekday()
            max_val = 7
        elif period == 'month_of_year':
            value = timestamp.month
            max_val = 12
        else:
            raise ValueError(f"Unknown period: {period}")

        # Sin/cos encoding preserves cyclical nature
        sin_val = math.sin(2 * math.pi * value / max_val)
        cos_val = math.cos(2 * math.pi * value / max_val)

        return (sin_val, cos_val)

    def compute_embedding_similarity(
        self,
        embedding_a: list[float],
        embedding_b: list[float]
    ) -> float:
        """Compute cosine similarity between embeddings."""
        dot_product = sum(a * b for a, b in zip(embedding_a, embedding_b))
        norm_a = sum(a * a for a in embedding_a) ** 0.5
        norm_b = sum(b * b for b in embedding_b) ** 0.5

        if norm_a == 0 or norm_b == 0:
            return 0.0

        return dot_product / (norm_a * norm_b)

Embedding Feature Store

Complete implementation for storing and retrieving embedding features:

from dataclasses import dataclass
from typing import Optional
import struct

@dataclass
class EmbeddingFeature:
    """An embedding feature definition."""
    name: str
    entity: str
    dimension: int
    model: str  # Model used to generate embedding
    update_frequency: str
    storage_format: str  # float32, float16, int8

class EmbeddingFeatureStore:
    """Specialized storage for embedding features."""

    def __init__(
        self,
        online_store,  # Redis, DynamoDB, etc.
        offline_store,  # Parquet, Delta Lake
        vector_index  # FAISS, Pinecone, etc.
    ):
        self.online = online_store
        self.offline = offline_store
        self.vector_index = vector_index

    async def store_embedding(
        self,
        feature_name: str,
        entity_key: str,
        embedding: list[float],
        metadata: dict = None
    ):
        """Store embedding for online serving."""
        # Compress embedding for storage efficiency
        compressed = self._compress_embedding(embedding)

        # Store in online store for single-entity lookup
        await self.online.set(
            key=f"{feature_name}:{entity_key}",
            value=compressed,
            metadata=metadata
        )

        # Update vector index for similarity search
        await self.vector_index.upsert(
            collection=feature_name,
            id=entity_key,
            vector=embedding,
            metadata=metadata
        )

    async def get_embedding(
        self,
        feature_name: str,
        entity_key: str
    ) -> Optional[list[float]]:
        """Retrieve embedding for entity."""
        compressed = await self.online.get(f"{feature_name}:{entity_key}")
        if compressed:
            return self._decompress_embedding(compressed)
        return None

    async def find_similar(
        self,
        feature_name: str,
        query_embedding: list[float],
        k: int = 10,
        filter_metadata: dict = None
    ) -> list[dict]:
        """Find k most similar entities."""
        results = await self.vector_index.search(
            collection=feature_name,
            vector=query_embedding,
            k=k,
            filter=filter_metadata
        )
        return results

    def _compress_embedding(self, embedding: list[float]) -> bytes:
        """Compress embedding for storage."""
        # Pack as float16 for 50% space savings
        return struct.pack(f'{len(embedding)}e', *embedding)

    def _decompress_embedding(self, compressed: bytes) -> list[float]:
        """Decompress embedding from storage."""
        n = len(compressed) // 2  # float16 is 2 bytes
        return list(struct.unpack(f'{n}e', compressed))

    async def batch_compute_embeddings(
        self,
        feature_name: str,
        texts: list[str],
        model,  # Embedding model
        entity_keys: list[str]
    ):
        """Batch compute and store embeddings."""
        # Compute embeddings in batch for efficiency
        embeddings = model.encode(texts, batch_size=32)

        # Store all embeddings
        for entity_key, embedding in zip(entity_keys, embeddings):
            await self.store_embedding(
                feature_name=feature_name,
                entity_key=entity_key,
                embedding=embedding.tolist()
            )

        return {'processed': len(texts)}

Training-Serving Skew

Skew Detector

Complete implementation for detecting training-serving skew:

class SkewDetector:
    """Detect training-serving skew."""

    def __init__(self, training_stats: dict):
        self.training_stats = training_stats

    async def check_serving_distribution(
        self,
        feature_name: str,
        serving_values: list
    ) -> dict:
        """Check if serving distribution matches training."""
        training = self.training_stats.get(feature_name, {})

        # Calculate serving statistics
        serving_mean = sum(serving_values) / len(serving_values)
        serving_std = self._std(serving_values)

        # Compare to training
        training_mean = training.get('mean', 0)
        training_std = training.get('std', 1)

        # Z-score of difference
        if training_std > 0:
            mean_diff_z = abs(serving_mean - training_mean) / training_std
        else:
            mean_diff_z = 0

        return {
            'feature': feature_name,
            'training_mean': training_mean,
            'serving_mean': serving_mean,
            'mean_difference_z_score': mean_diff_z,
            'is_skewed': mean_diff_z > 3,  # More than 3 std deviations
            'recommendation': self._recommend(mean_diff_z)
        }

    def _std(self, values: list) -> float:
        mean = sum(values) / len(values)
        return (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5

    def _recommend(self, z_score: float) -> str:
        if z_score < 1:
            return "No action needed"
        elif z_score < 2:
            return "Monitor - slight drift"
        elif z_score < 3:
            return "Investigate - moderate drift"
        else:
            return "Alert - significant skew detected"

Unified Feature Computation

Complete implementation for single source of truth feature computation:

from dataclasses import dataclass
from typing import Callable, Any

@dataclass
class FeatureDefinition:
    """Unified feature definition used for both training and serving."""
    name: str
    entity: str
    dtype: str
    transform: Callable[[Any], Any]
    dependencies: list[str]
    description: str

class UnifiedFeatureComputation:
    """Single source of truth for feature computation."""

    def __init__(self):
        self.definitions: dict[str, FeatureDefinition] = {}

    def register(self, definition: FeatureDefinition):
        """Register a feature definition."""
        self.definitions[definition.name] = definition

    def compute_for_training(
        self,
        feature_name: str,
        batch_data
    ):
        """Compute feature for batch training data."""
        definition = self.definitions[feature_name]
        # Apply same transform to batch
        return batch_data.apply(definition.transform)

    def compute_for_serving(
        self,
        feature_name: str,
        single_record: dict
    ) -> Any:
        """Compute feature for single serving record."""
        definition = self.definitions[feature_name]
        # Apply same transform to single record
        return definition.transform(single_record)

    def validate_consistency(
        self,
        feature_name: str,
        sample_data: list[dict]
    ) -> dict:
        """Validate training and serving compute same results."""
        definition = self.definitions[feature_name]

        training_results = [
            definition.transform(record) for record in sample_data
        ]

        serving_results = [
            self.compute_for_serving(feature_name, record)
            for record in sample_data
        ]

        mismatches = [
            i for i, (t, s) in enumerate(zip(training_results, serving_results))
            if t != s
        ]

        return {
            'feature': feature_name,
            'samples_tested': len(sample_data),
            'mismatches': len(mismatches),
            'consistent': len(mismatches) == 0,
            'mismatch_indices': mismatches[:10]  # First 10 mismatches
        }

Preprocessing Packager

Complete implementation for packaging preprocessing with model artifacts:

from typing import Callable

class PreprocessingPackager:
    """Package preprocessing with model artifact to prevent skew."""

    def __init__(self, preprocessing_steps: list[Callable]):
        self.steps = preprocessing_steps

    def fit(self, training_data):
        """Fit preprocessing on training data."""
        # Store fitted parameters (e.g., vocabulary, normalization stats)
        self.fitted_params = {}

        for i, step in enumerate(self.steps):
            if hasattr(step, 'fit'):
                step.fit(training_data)
                if hasattr(step, 'get_params'):
                    self.fitted_params[f'step_{i}'] = step.get_params()

    def transform(self, data):
        """Apply fitted preprocessing."""
        result = data
        for step in self.steps:
            result = step(result)
        return result

    def save_with_model(self, model, path: str):
        """Save preprocessing alongside model."""
        import pickle

        artifact = {
            'model': model,
            'preprocessing_steps': self.steps,
            'fitted_params': self.fitted_params,
            'version': self._compute_hash()
        }

        with open(path, 'wb') as f:
            pickle.dump(artifact, f)

    def _compute_hash(self) -> str:
        """Compute hash of preprocessing for versioning."""
        import hashlib

        # Hash the preprocessing code and parameters
        content = str(self.steps) + str(self.fitted_params)
        return hashlib.sha256(content.encode()).hexdigest()[:12]

    @classmethod
    def load_with_model(cls, path: str):
        """Load preprocessing with model."""
        import pickle

        with open(path, 'rb') as f:
            artifact = pickle.load(f)

        packager = cls(artifact['preprocessing_steps'])
        packager.fitted_params = artifact['fitted_params']

        return artifact['model'], packager

Skew Monitoring Dashboard

Complete implementation for monitoring training-serving skew in production:

from datetime import datetime
import math

class SkewMonitoringDashboard:
    """Monitor for training-serving skew in production."""

    def __init__(self, feature_names: list[str]):
        self.features = feature_names
        self.training_stats: dict[str, dict] = {}
        self.serving_samples: dict[str, list] = {f: [] for f in feature_names}

    def register_training_stats(
        self,
        feature_name: str,
        stats: dict
    ):
        """Register statistics from training data."""
        self.training_stats[feature_name] = stats

    def record_serving_sample(
        self,
        feature_name: str,
        value: float
    ):
        """Record a serving sample for monitoring."""
        self.serving_samples[feature_name].append(value)

        # Keep rolling window
        max_samples = 10000
        if len(self.serving_samples[feature_name]) > max_samples:
            self.serving_samples[feature_name] = \
                self.serving_samples[feature_name][-max_samples:]

    def compute_skew_metrics(self) -> dict:
        """Compute skew metrics for all features."""
        metrics = {}

        for feature in self.features:
            if feature not in self.training_stats:
                continue

            training = self.training_stats[feature]
            serving = self.serving_samples[feature]

            if len(serving) < 100:
                continue

            # Compute serving statistics
            serving_mean = sum(serving) / len(serving)
            serving_std = self._std(serving)

            training_mean = training.get('mean', 0)
            training_std = training.get('std', 1)

            # Population Stability Index (PSI)
            psi = self._compute_psi(training, serving)

            # Jensen-Shannon Divergence
            jsd = self._compute_jsd(training, serving)

            metrics[feature] = {
                'training_mean': training_mean,
                'serving_mean': serving_mean,
                'mean_shift': abs(serving_mean - training_mean) / (training_std + 1e-6),
                'psi': psi,
                'jsd': jsd,
                'alert_level': self._determine_alert(psi, jsd)
            }

        return metrics

    def _std(self, values: list) -> float:
        if not values:
            return 0
        mean = sum(values) / len(values)
        return (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5

    def _compute_psi(self, training: dict, serving: list) -> float:
        """Compute Population Stability Index."""
        # Simplified PSI calculation
        # Real implementation uses histogram bins
        training_mean = training.get('mean', 0)
        serving_mean = sum(serving) / len(serving) if serving else 0

        if training_mean == 0:
            return 0

        ratio = serving_mean / training_mean
        if ratio <= 0:
            return float('inf')

        return (ratio - 1) * math.log(ratio)

    def _compute_jsd(self, training: dict, serving: list) -> float:
        """Compute Jensen-Shannon Divergence."""
        # Simplified - real implementation uses proper distribution comparison
        return 0.0

    def _determine_alert(self, psi: float, jsd: float) -> str:
        """Determine alert level based on metrics."""
        if psi > 0.25 or jsd > 0.1:
            return 'critical'
        elif psi > 0.1 or jsd > 0.05:
            return 'warning'
        else:
            return 'ok'

Data Versioning and Lineage

Lineage Tracker

Complete implementation for tracking data lineage:

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class DataAsset:
    """A data asset in the lineage graph."""
    id: str
    name: str
    type: str  # table, feature_view, model, etc.
    version: str
    created: datetime
    created_by: str
    upstream: list[str] = field(default_factory=list)
    downstream: list[str] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

class LineageTracker:
    """Track data lineage."""

    def __init__(self):
        self.assets = {}

    def register_asset(
        self,
        asset: DataAsset,
        upstream_ids: list[str] = None
    ):
        """Register a data asset with its upstream dependencies."""
        self.assets[asset.id] = asset

        # Link to upstream
        if upstream_ids:
            asset.upstream = upstream_ids
            for up_id in upstream_ids:
                if up_id in self.assets:
                    self.assets[up_id].downstream.append(asset.id)

    def get_upstream_lineage(self, asset_id: str, depth: int = 10) -> list[DataAsset]:
        """Get all upstream dependencies."""
        lineage = []
        visited = set()

        def traverse(aid: str, current_depth: int):
            if aid in visited or current_depth > depth:
                return
            visited.add(aid)

            if aid in self.assets:
                asset = self.assets[aid]
                lineage.append(asset)
                for up_id in asset.upstream:
                    traverse(up_id, current_depth + 1)

        traverse(asset_id, 0)
        return lineage

    def get_downstream_impact(self, asset_id: str) -> list[DataAsset]:
        """Get all downstream assets that would be affected."""
        impact = []
        visited = set()

        def traverse(aid: str):
            if aid in visited:
                return
            visited.add(aid)

            if aid in self.assets:
                asset = self.assets[aid]
                if aid != asset_id:  # Don't include the source
                    impact.append(asset)
                for down_id in asset.downstream:
                    traverse(down_id)

        traverse(asset_id)
        return impact

    def model_training_lineage(self, model_id: str) -> dict:
        """Get complete lineage for a trained model."""
        model = self.assets.get(model_id)
        if not model:
            return {'error': 'Model not found'}

        upstream = self.get_upstream_lineage(model_id)

        return {
            'model': model,
            'training_data': [a for a in upstream if a.type == 'dataset'],
            'features': [a for a in upstream if a.type == 'feature_view'],
            'raw_data': [a for a in upstream if a.type == 'raw_table'],
            'full_lineage': upstream
        }

Data Version Manager

Complete implementation for managing versioned data:

from datetime import datetime

class DataVersionManager:
    """Manage versioned data for ML reproducibility."""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.versions: dict[str, list[dict]] = {}

    def create_version(
        self,
        dataset_name: str,
        data,
        metadata: dict = None
    ) -> str:
        """Create a new version of a dataset."""
        import hashlib
        import json

        # Compute content hash for deduplication
        content_hash = self._compute_hash(data)

        # Check if identical version exists
        if dataset_name in self.versions:
            for v in self.versions[dataset_name]:
                if v['content_hash'] == content_hash:
                    return v['version_id']  # Return existing version

        # Create new version
        version_id = f"v{len(self.versions.get(dataset_name, []))+1}_{content_hash[:8]}"

        version_info = {
            'version_id': version_id,
            'content_hash': content_hash,
            'created_at': datetime.now().isoformat(),
            'row_count': len(data) if hasattr(data, '__len__') else None,
            'metadata': metadata or {}
        }

        # Store data
        self.storage.put(
            key=f"{dataset_name}/{version_id}",
            data=data
        )

        # Record version
        if dataset_name not in self.versions:
            self.versions[dataset_name] = []
        self.versions[dataset_name].append(version_info)

        return version_id

    def get_version(
        self,
        dataset_name: str,
        version_id: str = None
    ):
        """Get specific version or latest."""
        if version_id is None:
            # Get latest
            if dataset_name not in self.versions:
                return None
            version_id = self.versions[dataset_name][-1]['version_id']

        return self.storage.get(f"{dataset_name}/{version_id}")

    def diff_versions(
        self,
        dataset_name: str,
        version_a: str,
        version_b: str
    ) -> dict:
        """Compare two versions of a dataset."""
        data_a = self.get_version(dataset_name, version_a)
        data_b = self.get_version(dataset_name, version_b)

        # Simple diff metrics
        return {
            'version_a': version_a,
            'version_b': version_b,
            'rows_a': len(data_a) if hasattr(data_a, '__len__') else 0,
            'rows_b': len(data_b) if hasattr(data_b, '__len__') else 0,
            'row_difference': len(data_b) - len(data_a) if hasattr(data_a, '__len__') else None,
            # Additional diff metrics would go here
        }

    def _compute_hash(self, data) -> str:
        """Compute hash of data for versioning."""
        import hashlib
        import pickle

        content = pickle.dumps(data)
        return hashlib.sha256(content).hexdigest()

    def list_versions(self, dataset_name: str) -> list[dict]:
        """List all versions of a dataset."""
        return self.versions.get(dataset_name, [])

    def pin_version_to_experiment(
        self,
        dataset_name: str,
        version_id: str,
        experiment_id: str
    ):
        """Pin a data version to an experiment for reproducibility."""
        # Would integrate with experiment tracking system
        pass

Data Observability

Data Observability Monitor

Complete implementation for monitoring data observability across pipelines:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from collections import defaultdict

@dataclass
class DataObservabilityMetric:
    """A data observability metric."""
    name: str
    value: float
    threshold: float
    status: str  # ok, warning, critical
    timestamp: datetime
    metadata: dict

class DataObservabilityMonitor:
    """Monitor data observability across pipelines."""

    def __init__(self):
        self.metrics: dict[str, list[DataObservabilityMetric]] = defaultdict(list)
        self.expectations: dict[str, dict] = {}

    def set_expectations(
        self,
        table_name: str,
        expectations: dict
    ):
        """Set expectations for a table."""
        self.expectations[table_name] = expectations

    def check_freshness(
        self,
        table_name: str,
        last_update: datetime
    ) -> DataObservabilityMetric:
        """Check data freshness."""
        expectations = self.expectations.get(table_name, {})
        max_age_hours = expectations.get('max_age_hours', 24)

        age_hours = (datetime.now() - last_update).total_seconds() / 3600

        if age_hours > max_age_hours * 2:
            status = 'critical'
        elif age_hours > max_age_hours:
            status = 'warning'
        else:
            status = 'ok'

        metric = DataObservabilityMetric(
            name='freshness',
            value=age_hours,
            threshold=max_age_hours,
            status=status,
            timestamp=datetime.now(),
            metadata={'last_update': last_update.isoformat()}
        )

        self.metrics[f'{table_name}.freshness'].append(metric)
        return metric

    def check_volume(
        self,
        table_name: str,
        current_count: int
    ) -> DataObservabilityMetric:
        """Check data volume against expectations."""
        expectations = self.expectations.get(table_name, {})
        min_count = expectations.get('min_row_count', 0)
        max_count = expectations.get('max_row_count', float('inf'))

        # Also check against historical
        historical = self.metrics.get(f'{table_name}.volume', [])

        if historical:
            recent_counts = [m.value for m in historical[-7:]]
            avg_count = sum(recent_counts) / len(recent_counts)
            deviation = abs(current_count - avg_count) / (avg_count + 1)
        else:
            deviation = 0

        if current_count < min_count or current_count > max_count:
            status = 'critical'
        elif deviation > 0.5:  # More than 50% deviation
            status = 'warning'
        else:
            status = 'ok'

        metric = DataObservabilityMetric(
            name='volume',
            value=current_count,
            threshold=min_count,
            status=status,
            timestamp=datetime.now(),
            metadata={'deviation_from_avg': deviation}
        )

        self.metrics[f'{table_name}.volume'].append(metric)
        return metric

    def check_schema(
        self,
        table_name: str,
        current_schema: dict,
        expected_schema: dict
    ) -> DataObservabilityMetric:
        """Check for unexpected schema changes."""
        differences = []

        # Check for missing fields
        for field in expected_schema.get('fields', []):
            if field not in current_schema.get('fields', []):
                differences.append(f"Missing field: {field}")

        # Check for new fields
        for field in current_schema.get('fields', []):
            if field not in expected_schema.get('fields', []):
                differences.append(f"New field: {field}")

        if len(differences) > 3:
            status = 'critical'
        elif len(differences) > 0:
            status = 'warning'
        else:
            status = 'ok'

        metric = DataObservabilityMetric(
            name='schema',
            value=len(differences),
            threshold=0,
            status=status,
            timestamp=datetime.now(),
            metadata={'differences': differences}
        )

        self.metrics[f'{table_name}.schema'].append(metric)
        return metric

    def check_distribution(
        self,
        table_name: str,
        column_name: str,
        current_stats: dict,
        baseline_stats: dict
    ) -> DataObservabilityMetric:
        """Check for distribution drift."""
        # Compute drift metrics
        mean_drift = abs(
            current_stats.get('mean', 0) - baseline_stats.get('mean', 0)
        ) / (baseline_stats.get('std', 1) + 1e-6)

        null_rate_change = abs(
            current_stats.get('null_rate', 0) -
            baseline_stats.get('null_rate', 0)
        )

        if mean_drift > 3 or null_rate_change > 0.1:
            status = 'critical'
        elif mean_drift > 2 or null_rate_change > 0.05:
            status = 'warning'
        else:
            status = 'ok'

        metric = DataObservabilityMetric(
            name='distribution',
            value=mean_drift,
            threshold=2.0,
            status=status,
            timestamp=datetime.now(),
            metadata={
                'column': column_name,
                'mean_drift': mean_drift,
                'null_rate_change': null_rate_change
            }
        )

        self.metrics[f'{table_name}.{column_name}.distribution'].append(metric)
        return metric

    def get_health_summary(self, table_name: str) -> dict:
        """Get overall health summary for a table."""
        relevant_metrics = {
            k: v for k, v in self.metrics.items()
            if k.startswith(table_name)
        }

        latest_metrics = {}
        for key, metric_list in relevant_metrics.items():
            if metric_list:
                latest_metrics[key] = metric_list[-1]

        statuses = [m.status for m in latest_metrics.values()]

        if 'critical' in statuses:
            overall_status = 'critical'
        elif 'warning' in statuses:
            overall_status = 'warning'
        else:
            overall_status = 'ok'

        return {
            'table': table_name,
            'overall_status': overall_status,
            'metrics': {k: vars(v) for k, v in latest_metrics.items()},
            'issues': [
                k for k, v in latest_metrics.items()
                if v.status != 'ok'
            ]
        }

Data Anomaly Detector

Complete implementation for detecting anomalies in data metrics:

class DataAnomalyDetector:
    """Detect anomalies in data metrics."""

    def __init__(self, sensitivity: float = 2.0):
        self.sensitivity = sensitivity
        self.baselines: dict[str, dict] = {}

    def fit_baseline(
        self,
        metric_name: str,
        historical_values: list[float]
    ):
        """Fit baseline from historical values."""
        if len(historical_values) < 7:
            raise ValueError("Need at least 7 historical values")

        mean = sum(historical_values) / len(historical_values)
        std = (sum((x - mean) ** 2 for x in historical_values) / len(historical_values)) ** 0.5

        self.baselines[metric_name] = {
            'mean': mean,
            'std': std,
            'min': min(historical_values),
            'max': max(historical_values)
        }

    def detect_anomaly(
        self,
        metric_name: str,
        value: float
    ) -> dict:
        """Detect if value is anomalous."""
        if metric_name not in self.baselines:
            return {'is_anomaly': False, 'reason': 'No baseline'}

        baseline = self.baselines[metric_name]
        z_score = abs(value - baseline['mean']) / (baseline['std'] + 1e-6)

        is_anomaly = z_score > self.sensitivity

        return {
            'is_anomaly': is_anomaly,
            'value': value,
            'z_score': z_score,
            'baseline_mean': baseline['mean'],
            'baseline_std': baseline['std'],
            'threshold': self.sensitivity,
            'severity': self._severity(z_score)
        }

    def _severity(self, z_score: float) -> str:
        if z_score > 4:
            return 'critical'
        elif z_score > 3:
            return 'high'
        elif z_score > 2:
            return 'medium'
        else:
            return 'low'

Data Quality

Data Quality Monitor

Complete implementation for monitoring data quality:

from dataclasses import dataclass
from datetime import datetime
from typing import Callable

@dataclass
class DataQualityCheck:
    """A data quality check."""
    name: str
    dimension: str
    check_func: Callable
    severity: str  # critical, warning, info
    threshold: float

class DataQualityMonitor:
    """Monitor data quality."""

    def __init__(self, checks: list[DataQualityCheck]):
        self.checks = checks
        self.results_history = []

    def run_checks(self, data) -> dict:
        """Run all quality checks on data."""
        results = {
            'timestamp': datetime.now(),
            'checks': [],
            'passed': True,
            'critical_failures': []
        }

        for check in self.checks:
            try:
                result = check.check_func(data)
                passed = result >= check.threshold

                check_result = {
                    'name': check.name,
                    'dimension': check.dimension,
                    'value': result,
                    'threshold': check.threshold,
                    'passed': passed
                }

                results['checks'].append(check_result)

                if not passed and check.severity == 'critical':
                    results['passed'] = False
                    results['critical_failures'].append(check.name)

            except Exception as e:
                results['checks'].append({
                    'name': check.name,
                    'error': str(e),
                    'passed': False
                })
                if check.severity == 'critical':
                    results['passed'] = False
                    results['critical_failures'].append(check.name)

        self.results_history.append(results)
        return results

    def trend_analysis(self, check_name: str, window: int = 7) -> dict:
        """Analyze trends for a specific check."""
        recent = [
            r for r in self.results_history[-window:]
            for c in r['checks']
            if c.get('name') == check_name and 'value' in c
        ]

        if not recent:
            return {'error': 'No data for check'}

        values = [c['value'] for c in recent]

        return {
            'check': check_name,
            'values': values,
            'trend': 'improving' if values[-1] > values[0] else 'degrading',
            'mean': sum(values) / len(values)
        }

Feature Governance

Feature Governance System

Complete implementation for managing feature governance:

from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class FeatureGovernance:
    """Governance model for feature management."""
    feature_name: str
    owner: str
    team: str
    description: str
    documentation_url: str
    status: str  # active, deprecated, experimental
    consumers: list[str]  # Models/services using this feature
    sla: dict  # Freshness, availability requirements
    created_date: datetime
    last_reviewed: datetime
    deprecation_date: Optional[datetime] = None

class FeatureGovernanceSystem:
    """Manage feature governance."""

    def __init__(self):
        self.features: dict[str, FeatureGovernance] = {}

    def register_feature(self, governance: FeatureGovernance):
        """Register feature with governance metadata."""
        # Require minimum documentation
        if not governance.description or len(governance.description) < 50:
            raise ValueError("Feature description must be at least 50 characters")
        if not governance.owner:
            raise ValueError("Feature must have an owner")

        self.features[governance.feature_name] = governance

    def deprecate_feature(
        self,
        feature_name: str,
        deprecation_date: datetime,
        migration_guide: str
    ) -> dict:
        """Deprecate a feature with proper notification."""
        feature = self.features.get(feature_name)
        if not feature:
            return {'error': 'Feature not found'}

        # Notify consumers
        notifications = []
        for consumer in feature.consumers:
            notifications.append({
                'consumer': consumer,
                'message': f"Feature {feature_name} will be deprecated on {deprecation_date}",
                'migration_guide': migration_guide
            })

        feature.status = 'deprecated'
        feature.deprecation_date = deprecation_date

        return {
            'feature': feature_name,
            'notifications_sent': len(notifications),
            'consumers_affected': feature.consumers
        }

    def audit_features(self) -> dict:
        """Audit feature health."""
        issues = []

        for name, feature in self.features.items():
            # Check for stale features
            if (datetime.now() - feature.last_reviewed).days > 90:
                issues.append({
                    'feature': name,
                    'issue': 'Not reviewed in 90+ days',
                    'severity': 'warning'
                })

            # Check for ownerless features
            if not feature.owner:
                issues.append({
                    'feature': name,
                    'issue': 'No owner assigned',
                    'severity': 'critical'
                })

            # Check for undocumented consumers
            if feature.status == 'active' and not feature.consumers:
                issues.append({
                    'feature': name,
                    'issue': 'No known consumers',
                    'severity': 'warning'
                })

        return {
            'total_features': len(self.features),
            'issues': issues,
            'issue_count': len(issues)
        }

Common Pitfalls Solutions

Freshness Monitor

Complete implementation for monitoring feature freshness:

from datetime import datetime

class FreshnessMonitor:
    """Monitor feature freshness."""

    def __init__(self, alerting_system):
        self.alerting = alerting_system
        self.freshness_slas: dict[str, int] = {}  # feature -> max_age_seconds
        self.last_updates: dict[str, datetime] = {}

    def set_sla(self, feature_name: str, max_age_seconds: int):
        """Set freshness SLA for a feature."""
        self.freshness_slas[feature_name] = max_age_seconds

    def record_update(self, feature_name: str, entity_key: str):
        """Record that a feature was updated."""
        key = f"{feature_name}:{entity_key}"
        self.last_updates[key] = datetime.now()

    def check_freshness_at_serving(
        self,
        feature_name: str,
        entity_key: str,
        feature_timestamp: datetime
    ) -> dict:
        """Check freshness when serving a prediction."""
        sla = self.freshness_slas.get(feature_name, 3600)  # Default 1 hour
        age_seconds = (datetime.now() - feature_timestamp).total_seconds()

        is_stale = age_seconds > sla

        result = {
            'feature': feature_name,
            'entity': entity_key,
            'age_seconds': age_seconds,
            'sla_seconds': sla,
            'is_stale': is_stale
        }

        if is_stale:
            self.alerting.send_alert(
                severity='warning',
                message=f"Stale feature: {feature_name} is {age_seconds}s old (SLA: {sla}s)",
                metadata=result
            )

        return result

Leakage Detector

Complete implementation for detecting data leakage in training data:

class LeakageDetector:
    """Detect data leakage in training data."""

    def check_timestamp_ordering(
        self,
        training_data,
        feature_timestamp_col: str,
        label_timestamp_col: str
    ) -> dict:
        """Verify features don't use future information."""
        violations = training_data[
            training_data[feature_timestamp_col] > training_data[label_timestamp_col]
        ]

        return {
            'total_rows': len(training_data),
            'violations': len(violations),
            'violation_rate': len(violations) / len(training_data),
            'sample_violations': violations.head(5).to_dict() if len(violations) > 0 else {},
            'has_leakage': len(violations) > 0
        }

    def check_feature_label_correlation(
        self,
        features,
        labels,
        threshold: float = 0.95
    ) -> dict:
        """Check for suspicious correlations that might indicate leakage."""
        suspicious = []

        for col in features.columns:
            # Simple correlation check
            if features[col].dtype in ['int64', 'float64']:
                corr = features[col].corr(labels)
                if abs(corr) > threshold:
                    suspicious.append({
                        'feature': col,
                        'correlation': corr,
                        'warning': 'Suspiciously high correlation - possible leakage'
                    })

        return {
            'features_checked': len(features.columns),
            'suspicious_features': suspicious
        }

    def validate_aggregation_windows(
        self,
        feature_definition: dict,
        prediction_time_col: str
    ) -> dict:
        """Validate aggregation windows don't include future data."""
        issues = []

        window_end = feature_definition.get('window_end')
        if window_end and window_end != 'prediction_time':
            issues.append(
                f"Window end should be prediction_time, not {window_end}"
            )

        # Check for inclusive vs exclusive bounds
        if feature_definition.get('window_inclusive', True):
            issues.append(
                "Window should exclude prediction_time event itself"
            )

        return {
            'feature': feature_definition.get('name'),
            'issues': issues,
            'valid': len(issues) == 0
        }

Real-Time Quality Gate

Complete implementation for lightweight quality checks in real-time pipelines:

from datetime import datetime

class RealTimeQualityGate:
    """Lightweight quality checks for real-time pipelines."""

    def __init__(self, strict_mode: bool = False):
        self.strict_mode = strict_mode
        self.deferred_checks = []
        self.type_expectations = {}
        self.required_fields = []
        self.range_checks = {}

    def quick_validate(self, record: dict) -> dict:
        """Fast validation for real-time path (< 1ms)."""
        issues = []

        # Type checks (fast)
        for field, expected_type in self.type_expectations.items():
            if field in record and not isinstance(record[field], expected_type):
                issues.append(f"Type mismatch: {field}")

        # Null checks (fast)
        for required_field in self.required_fields:
            if required_field not in record or record[required_field] is None:
                issues.append(f"Missing required: {required_field}")

        # Range checks (fast)
        for field, (min_val, max_val) in self.range_checks.items():
            if field in record:
                val = record[field]
                if val < min_val or val > max_val:
                    issues.append(f"Out of range: {field}")

        valid = len(issues) == 0

        if not valid:
            # Queue for async detailed analysis
            self.deferred_checks.append({
                'record': record,
                'issues': issues,
                'timestamp': datetime.now()
            })

        return {
            'valid': valid or not self.strict_mode,
            'issues': issues,
            'action': 'reject' if self.strict_mode and not valid else 'accept_with_warning'
        }