Data Architecture Code Reference
Complete code implementations for Chapter 24: Data Architecture for AI.
Feature Store Architecture
flowchart TB
subgraph Sources["DATA SOURCES"]
direction LR
Events["Event Stream<br/>(Kafka)"]
Batch["Batch Data<br/>(Data Warehouse)"]
RT["Real-time APIs"]
end
subgraph Compute["FEATURE COMPUTATION"]
direction TB
Stream["Streaming Engine<br/>(Flink/Spark)"]
BatchProc["Batch Processing<br/>(Spark/dbt)"]
subgraph Registry["Feature Registry"]
Defs["Feature Definitions"]
Lineage["Lineage Tracking"]
Governance["Governance Rules"]
end
end
subgraph Storage["FEATURE STORAGE"]
direction LR
subgraph Online["Online Store"]
Redis["Redis/DynamoDB<br/>(Low Latency)"]
end
subgraph Offline["Offline Store"]
Delta["Delta Lake/Parquet<br/>(Historical)"]
end
subgraph Vector["Vector Store"]
Embeddings["Embeddings<br/>(Pinecone/Milvus)"]
end
end
subgraph Serving["SERVING LAYER"]
direction LR
OnlineAPI["Online API<br/>< 10ms p99"]
OfflineAPI["Batch API<br/>Point-in-Time Joins"]
end
subgraph Consumers["CONSUMERS"]
direction LR
Training["Model Training"]
Inference["Real-time Inference"]
Analytics["Analytics/BI"]
end
Events --> Stream
Batch --> BatchProc
RT --> Stream
Stream --> Online
Stream --> Vector
BatchProc --> Offline
BatchProc --> Online
Registry -.->|"Defines"| Stream
Registry -.->|"Defines"| BatchProc
Online --> OnlineAPI
Offline --> OfflineAPI
Vector --> OnlineAPI
OnlineAPI --> Inference
OfflineAPI --> Training
OfflineAPI --> Analytics
style Sources fill:#e3f2fd
style Compute fill:#fff3e0
style Storage fill:#e8f5e9
style Serving fill:#f3e5f5
Data Pipeline Architecture
flowchart LR
subgraph Lambda["LAMBDA ARCHITECTURE"]
direction TB
L_Batch["Batch Layer<br/>(Accuracy)"]
L_Speed["Speed Layer<br/>(Low Latency)"]
L_Serving["Serving Layer<br/>(Merge Views)"]
L_Batch --> L_Serving
L_Speed --> L_Serving
end
subgraph Kappa["KAPPA ARCHITECTURE"]
direction TB
K_Stream["Stream Processing<br/>(Single Path)"]
K_Store["Serving Store"]
K_Replay["Replay for<br/>Corrections"]
K_Stream --> K_Store
K_Replay -.-> K_Stream
end
Input["Raw Events"] --> Lambda
Input --> Kappa
style Lambda fill:#e3f2fd
style Kappa fill:#e8f5e9
Training-Serving Consistency
flowchart TB
subgraph Definition["SINGLE DEFINITION"]
FeatureDef["Feature Definition<br/>(Code + Config)"]
end
subgraph Training["TRAINING PATH"]
T_Historical["Historical<br/>Data"]
T_PIT["Point-in-Time<br/>Join"]
T_Features["Training<br/>Features"]
T_Model["Model<br/>Training"]
T_Historical --> T_PIT
T_PIT --> T_Features
T_Features --> T_Model
end
subgraph Serving["SERVING PATH"]
S_Request["Inference<br/>Request"]
S_Lookup["Feature<br/>Lookup"]
S_Features["Serving<br/>Features"]
S_Predict["Model<br/>Prediction"]
S_Request --> S_Lookup
S_Lookup --> S_Features
S_Features --> S_Predict
end
FeatureDef --> T_PIT
FeatureDef --> S_Lookup
subgraph Monitoring["SKEW DETECTION"]
Compare["Distribution<br/>Comparison"]
Alert["Drift<br/>Alerts"]
Compare --> Alert
end
T_Features -.-> Compare
S_Features -.-> Compare
style Definition fill:#fff3e0
style Training fill:#e3f2fd
style Serving fill:#e8f5e9
style Monitoring fill:#ffebee
Table of Contents
- Data Pipeline Architecture
- Feature Stores
- Training-Serving Skew
- Data Versioning and Lineage
- Data Observability
- Data Quality
- Feature Governance
- Common Pitfalls Solutions
Data Pipeline Architecture
Architecture Selector
Complete implementation for choosing between Lambda and Kappa architectures:
class ArchitectureSelector:
"""Help choose between Lambda and Kappa architectures."""
def recommend(self, requirements: dict) -> dict:
"""Recommend architecture based on requirements."""
score_lambda = 0
score_kappa = 0
reasons = {'lambda': [], 'kappa': []}
# Evaluate factors
if requirements.get('complex_batch_processing'):
score_lambda += 2
reasons['lambda'].append(
"Complex batch processing better in dedicated batch layer"
)
if requirements.get('maintain_single_codebase'):
score_kappa += 2
reasons['kappa'].append(
"Single codebase simpler to maintain"
)
if requirements.get('historical_accuracy_critical'):
score_lambda += 1
reasons['lambda'].append(
"Batch layer ensures accurate historical computation"
)
if requirements.get('reprocessing_frequent'):
score_kappa += 2
reasons['kappa'].append(
"Replay-based reprocessing is core strength"
)
if requirements.get('latency_requirements') == 'sub_second':
score_kappa += 1
reasons['kappa'].append(
"Pure streaming often achieves lower latency"
)
if requirements.get('team_experience') == 'batch_heavy':
score_lambda += 1
reasons['lambda'].append(
"Team can leverage existing batch expertise"
)
recommendation = 'lambda' if score_lambda > score_kappa else 'kappa'
return {
'recommendation': recommendation,
'scores': {'lambda': score_lambda, 'kappa': score_kappa},
'reasons': reasons[recommendation],
'tradeoffs': self._describe_tradeoffs(recommendation)
}
def _describe_tradeoffs(self, recommendation: str) -> list[str]:
if recommendation == 'lambda':
return [
"Pro: Best-of-both-worlds accuracy and speed",
"Con: Two codebases to maintain",
"Con: More complex deployment and operations",
"Con: Potential batch/speed layer inconsistencies"
]
else:
return [
"Pro: Single codebase and processing paradigm",
"Pro: Simpler mental model and debugging",
"Con: Batch-like operations less natural",
"Con: Replay for corrections can be expensive"
]Schema Registry
Complete implementation for central schema registry with compatibility checking:
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import json
class SchemaCompatibility(Enum):
BACKWARD = "backward" # New schema can read old data
FORWARD = "forward" # Old schema can read new data
FULL = "full" # Both backward and forward
NONE = "none" # No compatibility guarantee
@dataclass
class SchemaField:
"""A field in a data schema."""
name: str
type: str
nullable: bool = True
default: Optional[str] = None
description: str = ""
@dataclass
class DataSchema:
"""Data schema with version tracking."""
name: str
version: int
fields: list[SchemaField]
compatibility: SchemaCompatibility
def to_json_schema(self) -> dict:
"""Convert to JSON Schema format."""
properties = {}
required = []
for field in self.fields:
properties[field.name] = {
'type': self._map_type(field.type),
'description': field.description
}
if field.default is not None:
properties[field.name]['default'] = field.default
if not field.nullable:
required.append(field.name)
return {
'type': 'object',
'properties': properties,
'required': required,
'$schema': 'http://json-schema.org/draft-07/schema#'
}
def _map_type(self, dtype: str) -> str:
mapping = {
'string': 'string',
'int': 'integer',
'float': 'number',
'bool': 'boolean',
'array': 'array',
'object': 'object'
}
return mapping.get(dtype, 'string')
class SchemaRegistry:
"""Central schema registry for data contracts."""
def __init__(self):
self.schemas: dict[str, list[DataSchema]] = {}
def register(self, schema: DataSchema) -> dict:
"""Register a new schema version."""
name = schema.name
if name not in self.schemas:
self.schemas[name] = []
self.schemas[name].append(schema)
return {'status': 'registered', 'version': schema.version}
# Check compatibility with previous version
previous = self.schemas[name][-1]
compatibility_check = self._check_compatibility(previous, schema)
if not compatibility_check['compatible']:
return {
'status': 'rejected',
'reason': compatibility_check['violations']
}
self.schemas[name].append(schema)
return {'status': 'registered', 'version': schema.version}
def _check_compatibility(
self,
old: DataSchema,
new: DataSchema
) -> dict:
"""Check if new schema is compatible with old."""
violations = []
old_fields = {f.name: f for f in old.fields}
new_fields = {f.name: f for f in new.fields}
# Check based on compatibility mode
if new.compatibility in [
SchemaCompatibility.BACKWARD,
SchemaCompatibility.FULL
]:
# New schema must be able to read old data
# Old fields must exist in new schema or have defaults
for name, old_field in old_fields.items():
if name not in new_fields:
violations.append(
f"Field '{name}' removed without default"
)
if new.compatibility in [
SchemaCompatibility.FORWARD,
SchemaCompatibility.FULL
]:
# Old schema must be able to read new data
# New fields must be nullable or have defaults
for name, new_field in new_fields.items():
if name not in old_fields:
if not new_field.nullable and new_field.default is None:
violations.append(
f"New field '{name}' is not nullable and has no default"
)
return {
'compatible': len(violations) == 0,
'violations': violations
}
def get_latest(self, name: str) -> Optional[DataSchema]:
"""Get latest schema version."""
if name in self.schemas and self.schemas[name]:
return self.schemas[name][-1]
return None
def get_version(self, name: str, version: int) -> Optional[DataSchema]:
"""Get specific schema version."""
if name in self.schemas:
for schema in self.schemas[name]:
if schema.version == version:
return schema
return NoneContract Enforcer
Complete implementation for enforcing data contracts:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SLA:
"""Service Level Agreement for data."""
freshness_seconds: int # Max age of data
availability_percent: float # e.g., 99.9
completeness_percent: float # Minimum row count vs expected
@dataclass
class DataContract:
"""Contract between data producer and consumer."""
name: str
producer: str
consumers: list[str]
schema: DataSchema
sla: SLA
quality_checks: list[str]
owner: str
documentation_url: str
class ContractEnforcer:
"""Enforce data contracts."""
def __init__(self, contracts: list[DataContract]):
self.contracts = {c.name: c for c in contracts}
self.violations: list[dict] = []
def validate_write(
self,
contract_name: str,
data,
producer_id: str
) -> dict:
"""Validate data before allowing write."""
contract = self.contracts.get(contract_name)
if not contract:
return {'valid': False, 'error': 'Unknown contract'}
# Check producer authorization
if producer_id != contract.producer:
return {'valid': False, 'error': 'Unauthorized producer'}
# Schema validation
schema_result = self._validate_schema(data, contract.schema)
if not schema_result['valid']:
return schema_result
# Quality checks
quality_result = self._run_quality_checks(data, contract)
if not quality_result['passed']:
self._record_violation(contract_name, 'quality', quality_result)
return {
'valid': False,
'error': 'Quality checks failed',
'details': quality_result
}
return {'valid': True}
def _validate_schema(self, data, schema: DataSchema) -> dict:
"""Validate data against schema."""
# Implementation would use JSON Schema validation
return {'valid': True}
def _run_quality_checks(
self,
data,
contract: DataContract
) -> dict:
"""Run quality checks specified in contract."""
results = []
for check_name in contract.quality_checks:
# Would dispatch to actual check implementations
results.append({'check': check_name, 'passed': True})
return {
'passed': all(r['passed'] for r in results),
'results': results
}
def _record_violation(
self,
contract: str,
violation_type: str,
details: dict
):
"""Record contract violation for alerting."""
self.violations.append({
'contract': contract,
'type': violation_type,
'details': details,
'timestamp': datetime.now()
})
def check_sla_compliance(self, contract_name: str) -> dict:
"""Check if SLA is being met."""
contract = self.contracts.get(contract_name)
if not contract:
return {'error': 'Unknown contract'}
# Would query metrics system for actual compliance
return {
'contract': contract_name,
'freshness_met': True,
'availability_met': True,
'completeness_met': True,
'overall_compliant': True
}Feature Stores
Feature Store Interface
Complete implementation for feature store with online/offline serving:
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
@dataclass
class Feature:
"""Feature definition."""
name: str
entity: str # user, item, session, etc.
value_type: str # int, float, string, embedding
description: str
owner: str
created: datetime
computation: str # SQL, Python, streaming
@dataclass
class FeatureView:
"""Collection of features with shared entity and timing."""
name: str
entities: list[str]
features: list[Feature]
ttl: Optional[int] # Time-to-live in seconds
online: bool # Available for real-time serving
offline: bool # Available for training
class FeatureStore:
"""Feature store interface."""
def __init__(self, online_store, offline_store, registry):
self.online = online_store
self.offline = offline_store
self.registry = registry
# Online serving
async def get_online_features(
self,
feature_refs: list[str],
entity_keys: dict[str, Any]
) -> dict[str, Any]:
"""Get features for real-time inference."""
features = {}
for ref in feature_refs:
view_name, feature_name = ref.split(':')
value = await self.online.get(
view=view_name,
feature=feature_name,
keys=entity_keys
)
features[ref] = value
return features
# Offline training
def get_historical_features(
self,
feature_refs: list[str],
entity_df, # DataFrame with entity keys and event_timestamp
):
"""Get point-in-time correct features for training."""
# Perform point-in-time join
result = self.offline.point_in_time_join(
feature_refs=feature_refs,
entity_df=entity_df,
ttl=self._get_ttl(feature_refs)
)
return result
def _get_ttl(self, feature_refs: list[str]) -> dict:
"""Get TTL for each feature view."""
ttls = {}
for ref in feature_refs:
view_name = ref.split(':')[0]
view = self.registry.get_view(view_name)
ttls[view_name] = view.ttl
return ttlsFeature Store Selector Tool
Complete implementation for selecting the right feature store platform:
from dataclasses import dataclass
@dataclass
class FeatureStorePlatform:
"""Feature store platform comparison."""
name: str
type: str # open_source, managed, embedded
strengths: list[str]
limitations: list[str]
best_for: str
real_time_support: str
llm_integration: str
FEATURE_STORE_PLATFORMS = [
FeatureStorePlatform(
name="Feast",
type="open_source",
strengths=[
"Cloud-agnostic and self-hosted",
"Active community and ecosystem",
"Flexible online/offline store backends",
"Strong Python SDK"
],
limitations=[
"Requires infrastructure management",
"Less built-in streaming support",
"Feature computation is external"
],
best_for="Teams wanting control and flexibility",
real_time_support="Via external streaming (Spark, Flink)",
llm_integration="Embedding storage, RAG features"
),
FeatureStorePlatform(
name="Tecton",
type="managed",
strengths=[
"Enterprise-grade managed service",
"Built-in streaming feature computation",
"Strong governance and monitoring",
"Declarative feature definitions"
],
limitations=[
"Higher cost for smaller teams",
"Vendor lock-in considerations",
"Less customization flexibility"
],
best_for="Large enterprises with complex requirements",
real_time_support="Native streaming with exactly-once",
llm_integration="Vector embeddings, prompt caching"
),
FeatureStorePlatform(
name="Databricks Feature Store",
type="embedded",
strengths=[
"Tight Unity Catalog integration",
"Seamless Spark/Delta Lake workflow",
"Built-in MLflow integration",
"Automatic feature lineage"
],
limitations=[
"Requires Databricks ecosystem",
"Less flexible for non-Spark workloads"
],
best_for="Teams already on Databricks",
real_time_support="Via Spark Structured Streaming",
llm_integration="Embedding computation in Spark"
),
FeatureStorePlatform(
name="SageMaker Feature Store",
type="managed",
strengths=[
"Deep AWS integration",
"Online/offline with single API",
"Built-in security and compliance",
"Pay-per-use pricing"
],
limitations=[
"AWS-only",
"Less flexible transformation options",
"Learning curve for non-SageMaker users"
],
best_for="AWS-native ML teams",
real_time_support="Kinesis integration",
llm_integration="Bedrock feature serving"
),
FeatureStorePlatform(
name="Vertex AI Feature Store",
type="managed",
strengths=[
"GCP-native with BigQuery integration",
"Strong embedding and vector support",
"Built-in feature monitoring",
"Bigtable online serving"
],
limitations=[
"GCP-only",
"Pricing can be complex",
"Less batch-oriented flexibility"
],
best_for="GCP-native teams, especially with BigQuery",
real_time_support="Dataflow streaming",
llm_integration="Native vector search, Vertex AI"
),
]
class FeatureStoreSelectorTool:
"""Help teams select the right feature store."""
def recommend(self, requirements: dict) -> dict:
"""Recommend feature store based on requirements."""
scores = {p.name: 0 for p in FEATURE_STORE_PLATFORMS}
# Score based on requirements
cloud = requirements.get('cloud')
if cloud == 'aws':
scores['SageMaker Feature Store'] += 3
elif cloud == 'gcp':
scores['Vertex AI Feature Store'] += 3
elif cloud == 'multi' or cloud == 'on_prem':
scores['Feast'] += 3
if requirements.get('databricks_existing'):
scores['Databricks Feature Store'] += 4
if requirements.get('streaming_critical'):
scores['Tecton'] += 3
scores['Databricks Feature Store'] += 2
if requirements.get('budget') == 'limited':
scores['Feast'] += 3
if requirements.get('enterprise_governance'):
scores['Tecton'] += 3
scores['SageMaker Feature Store'] += 2
if requirements.get('llm_heavy'):
scores['Vertex AI Feature Store'] += 2
scores['Tecton'] += 1
# Find winner
winner = max(scores.items(), key=lambda x: x[1])
return {
'recommendation': winner[0],
'scores': scores,
'rationale': self._get_rationale(winner[0], requirements)
}
def _get_rationale(self, platform: str, requirements: dict) -> str:
rationales = {
'Feast': "Open-source flexibility and cloud-agnostic deployment",
'Tecton': "Enterprise features and native streaming capabilities",
'Databricks Feature Store': "Existing Databricks investment and Spark workflows",
'SageMaker Feature Store': "AWS-native with deep service integration",
'Vertex AI Feature Store': "GCP-native with strong vector/embedding support"
}
return rationales.get(platform, "")Feature Engineerer
Complete implementation for feature engineering utilities:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, Optional
class FeaturePattern(Enum):
AGGREGATION = "aggregation" # Count, sum, avg over windows
EMBEDDING = "embedding" # Dense vector representations
CROSS_FEATURE = "cross_feature" # Interactions between features
TEMPORAL = "temporal" # Time-based features
BEHAVIORAL = "behavioral" # User/entity behavior patterns
CONTEXTUAL = "contextual" # Environmental/situational
class FeatureEngineerer:
"""Feature engineering utilities."""
def compute_windowed_aggregation(
self,
events,
entity_key: str,
value_col: str,
aggregation: str,
window_seconds: int,
current_time: datetime
) -> dict:
"""Compute windowed aggregation feature."""
# Filter events within window
window_start = current_time - timedelta(seconds=window_seconds)
windowed = [
e for e in events
if e['timestamp'] >= window_start
and e['timestamp'] < current_time
]
if not windowed:
return {entity_key: None}
values = [e[value_col] for e in windowed if value_col in e]
if aggregation == 'count':
return {entity_key: len(values)}
elif aggregation == 'sum':
return {entity_key: sum(values)}
elif aggregation == 'avg':
return {entity_key: sum(values) / len(values) if values else None}
elif aggregation == 'max':
return {entity_key: max(values) if values else None}
elif aggregation == 'min':
return {entity_key: min(values) if values else None}
else:
raise ValueError(f"Unknown aggregation: {aggregation}")
def encode_cyclical_time(
self,
timestamp: datetime,
period: str
) -> tuple[float, float]:
"""Encode cyclical time features using sin/cos."""
import math
if period == 'hour_of_day':
value = timestamp.hour
max_val = 24
elif period == 'day_of_week':
value = timestamp.weekday()
max_val = 7
elif period == 'month_of_year':
value = timestamp.month
max_val = 12
else:
raise ValueError(f"Unknown period: {period}")
# Sin/cos encoding preserves cyclical nature
sin_val = math.sin(2 * math.pi * value / max_val)
cos_val = math.cos(2 * math.pi * value / max_val)
return (sin_val, cos_val)
def compute_embedding_similarity(
self,
embedding_a: list[float],
embedding_b: list[float]
) -> float:
"""Compute cosine similarity between embeddings."""
dot_product = sum(a * b for a, b in zip(embedding_a, embedding_b))
norm_a = sum(a * a for a in embedding_a) ** 0.5
norm_b = sum(b * b for b in embedding_b) ** 0.5
if norm_a == 0 or norm_b == 0:
return 0.0
return dot_product / (norm_a * norm_b)Embedding Feature Store
Complete implementation for storing and retrieving embedding features:
from dataclasses import dataclass
from typing import Optional
import struct
@dataclass
class EmbeddingFeature:
"""An embedding feature definition."""
name: str
entity: str
dimension: int
model: str # Model used to generate embedding
update_frequency: str
storage_format: str # float32, float16, int8
class EmbeddingFeatureStore:
"""Specialized storage for embedding features."""
def __init__(
self,
online_store, # Redis, DynamoDB, etc.
offline_store, # Parquet, Delta Lake
vector_index # FAISS, Pinecone, etc.
):
self.online = online_store
self.offline = offline_store
self.vector_index = vector_index
async def store_embedding(
self,
feature_name: str,
entity_key: str,
embedding: list[float],
metadata: dict = None
):
"""Store embedding for online serving."""
# Compress embedding for storage efficiency
compressed = self._compress_embedding(embedding)
# Store in online store for single-entity lookup
await self.online.set(
key=f"{feature_name}:{entity_key}",
value=compressed,
metadata=metadata
)
# Update vector index for similarity search
await self.vector_index.upsert(
collection=feature_name,
id=entity_key,
vector=embedding,
metadata=metadata
)
async def get_embedding(
self,
feature_name: str,
entity_key: str
) -> Optional[list[float]]:
"""Retrieve embedding for entity."""
compressed = await self.online.get(f"{feature_name}:{entity_key}")
if compressed:
return self._decompress_embedding(compressed)
return None
async def find_similar(
self,
feature_name: str,
query_embedding: list[float],
k: int = 10,
filter_metadata: dict = None
) -> list[dict]:
"""Find k most similar entities."""
results = await self.vector_index.search(
collection=feature_name,
vector=query_embedding,
k=k,
filter=filter_metadata
)
return results
def _compress_embedding(self, embedding: list[float]) -> bytes:
"""Compress embedding for storage."""
# Pack as float16 for 50% space savings
return struct.pack(f'{len(embedding)}e', *embedding)
def _decompress_embedding(self, compressed: bytes) -> list[float]:
"""Decompress embedding from storage."""
n = len(compressed) // 2 # float16 is 2 bytes
return list(struct.unpack(f'{n}e', compressed))
async def batch_compute_embeddings(
self,
feature_name: str,
texts: list[str],
model, # Embedding model
entity_keys: list[str]
):
"""Batch compute and store embeddings."""
# Compute embeddings in batch for efficiency
embeddings = model.encode(texts, batch_size=32)
# Store all embeddings
for entity_key, embedding in zip(entity_keys, embeddings):
await self.store_embedding(
feature_name=feature_name,
entity_key=entity_key,
embedding=embedding.tolist()
)
return {'processed': len(texts)}Training-Serving Skew
Skew Detector
Complete implementation for detecting training-serving skew:
class SkewDetector:
"""Detect training-serving skew."""
def __init__(self, training_stats: dict):
self.training_stats = training_stats
async def check_serving_distribution(
self,
feature_name: str,
serving_values: list
) -> dict:
"""Check if serving distribution matches training."""
training = self.training_stats.get(feature_name, {})
# Calculate serving statistics
serving_mean = sum(serving_values) / len(serving_values)
serving_std = self._std(serving_values)
# Compare to training
training_mean = training.get('mean', 0)
training_std = training.get('std', 1)
# Z-score of difference
if training_std > 0:
mean_diff_z = abs(serving_mean - training_mean) / training_std
else:
mean_diff_z = 0
return {
'feature': feature_name,
'training_mean': training_mean,
'serving_mean': serving_mean,
'mean_difference_z_score': mean_diff_z,
'is_skewed': mean_diff_z > 3, # More than 3 std deviations
'recommendation': self._recommend(mean_diff_z)
}
def _std(self, values: list) -> float:
mean = sum(values) / len(values)
return (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
def _recommend(self, z_score: float) -> str:
if z_score < 1:
return "No action needed"
elif z_score < 2:
return "Monitor - slight drift"
elif z_score < 3:
return "Investigate - moderate drift"
else:
return "Alert - significant skew detected"Unified Feature Computation
Complete implementation for single source of truth feature computation:
from dataclasses import dataclass
from typing import Callable, Any
@dataclass
class FeatureDefinition:
"""Unified feature definition used for both training and serving."""
name: str
entity: str
dtype: str
transform: Callable[[Any], Any]
dependencies: list[str]
description: str
class UnifiedFeatureComputation:
"""Single source of truth for feature computation."""
def __init__(self):
self.definitions: dict[str, FeatureDefinition] = {}
def register(self, definition: FeatureDefinition):
"""Register a feature definition."""
self.definitions[definition.name] = definition
def compute_for_training(
self,
feature_name: str,
batch_data
):
"""Compute feature for batch training data."""
definition = self.definitions[feature_name]
# Apply same transform to batch
return batch_data.apply(definition.transform)
def compute_for_serving(
self,
feature_name: str,
single_record: dict
) -> Any:
"""Compute feature for single serving record."""
definition = self.definitions[feature_name]
# Apply same transform to single record
return definition.transform(single_record)
def validate_consistency(
self,
feature_name: str,
sample_data: list[dict]
) -> dict:
"""Validate training and serving compute same results."""
definition = self.definitions[feature_name]
training_results = [
definition.transform(record) for record in sample_data
]
serving_results = [
self.compute_for_serving(feature_name, record)
for record in sample_data
]
mismatches = [
i for i, (t, s) in enumerate(zip(training_results, serving_results))
if t != s
]
return {
'feature': feature_name,
'samples_tested': len(sample_data),
'mismatches': len(mismatches),
'consistent': len(mismatches) == 0,
'mismatch_indices': mismatches[:10] # First 10 mismatches
}Preprocessing Packager
Complete implementation for packaging preprocessing with model artifacts:
from typing import Callable
class PreprocessingPackager:
"""Package preprocessing with model artifact to prevent skew."""
def __init__(self, preprocessing_steps: list[Callable]):
self.steps = preprocessing_steps
def fit(self, training_data):
"""Fit preprocessing on training data."""
# Store fitted parameters (e.g., vocabulary, normalization stats)
self.fitted_params = {}
for i, step in enumerate(self.steps):
if hasattr(step, 'fit'):
step.fit(training_data)
if hasattr(step, 'get_params'):
self.fitted_params[f'step_{i}'] = step.get_params()
def transform(self, data):
"""Apply fitted preprocessing."""
result = data
for step in self.steps:
result = step(result)
return result
def save_with_model(self, model, path: str):
"""Save preprocessing alongside model."""
import pickle
artifact = {
'model': model,
'preprocessing_steps': self.steps,
'fitted_params': self.fitted_params,
'version': self._compute_hash()
}
with open(path, 'wb') as f:
pickle.dump(artifact, f)
def _compute_hash(self) -> str:
"""Compute hash of preprocessing for versioning."""
import hashlib
# Hash the preprocessing code and parameters
content = str(self.steps) + str(self.fitted_params)
return hashlib.sha256(content.encode()).hexdigest()[:12]
@classmethod
def load_with_model(cls, path: str):
"""Load preprocessing with model."""
import pickle
with open(path, 'rb') as f:
artifact = pickle.load(f)
packager = cls(artifact['preprocessing_steps'])
packager.fitted_params = artifact['fitted_params']
return artifact['model'], packagerSkew Monitoring Dashboard
Complete implementation for monitoring training-serving skew in production:
from datetime import datetime
import math
class SkewMonitoringDashboard:
"""Monitor for training-serving skew in production."""
def __init__(self, feature_names: list[str]):
self.features = feature_names
self.training_stats: dict[str, dict] = {}
self.serving_samples: dict[str, list] = {f: [] for f in feature_names}
def register_training_stats(
self,
feature_name: str,
stats: dict
):
"""Register statistics from training data."""
self.training_stats[feature_name] = stats
def record_serving_sample(
self,
feature_name: str,
value: float
):
"""Record a serving sample for monitoring."""
self.serving_samples[feature_name].append(value)
# Keep rolling window
max_samples = 10000
if len(self.serving_samples[feature_name]) > max_samples:
self.serving_samples[feature_name] = \
self.serving_samples[feature_name][-max_samples:]
def compute_skew_metrics(self) -> dict:
"""Compute skew metrics for all features."""
metrics = {}
for feature in self.features:
if feature not in self.training_stats:
continue
training = self.training_stats[feature]
serving = self.serving_samples[feature]
if len(serving) < 100:
continue
# Compute serving statistics
serving_mean = sum(serving) / len(serving)
serving_std = self._std(serving)
training_mean = training.get('mean', 0)
training_std = training.get('std', 1)
# Population Stability Index (PSI)
psi = self._compute_psi(training, serving)
# Jensen-Shannon Divergence
jsd = self._compute_jsd(training, serving)
metrics[feature] = {
'training_mean': training_mean,
'serving_mean': serving_mean,
'mean_shift': abs(serving_mean - training_mean) / (training_std + 1e-6),
'psi': psi,
'jsd': jsd,
'alert_level': self._determine_alert(psi, jsd)
}
return metrics
def _std(self, values: list) -> float:
if not values:
return 0
mean = sum(values) / len(values)
return (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
def _compute_psi(self, training: dict, serving: list) -> float:
"""Compute Population Stability Index."""
# Simplified PSI calculation
# Real implementation uses histogram bins
training_mean = training.get('mean', 0)
serving_mean = sum(serving) / len(serving) if serving else 0
if training_mean == 0:
return 0
ratio = serving_mean / training_mean
if ratio <= 0:
return float('inf')
return (ratio - 1) * math.log(ratio)
def _compute_jsd(self, training: dict, serving: list) -> float:
"""Compute Jensen-Shannon Divergence."""
# Simplified - real implementation uses proper distribution comparison
return 0.0
def _determine_alert(self, psi: float, jsd: float) -> str:
"""Determine alert level based on metrics."""
if psi > 0.25 or jsd > 0.1:
return 'critical'
elif psi > 0.1 or jsd > 0.05:
return 'warning'
else:
return 'ok'Data Versioning and Lineage
Lineage Tracker
Complete implementation for tracking data lineage:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class DataAsset:
"""A data asset in the lineage graph."""
id: str
name: str
type: str # table, feature_view, model, etc.
version: str
created: datetime
created_by: str
upstream: list[str] = field(default_factory=list)
downstream: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class LineageTracker:
"""Track data lineage."""
def __init__(self):
self.assets = {}
def register_asset(
self,
asset: DataAsset,
upstream_ids: list[str] = None
):
"""Register a data asset with its upstream dependencies."""
self.assets[asset.id] = asset
# Link to upstream
if upstream_ids:
asset.upstream = upstream_ids
for up_id in upstream_ids:
if up_id in self.assets:
self.assets[up_id].downstream.append(asset.id)
def get_upstream_lineage(self, asset_id: str, depth: int = 10) -> list[DataAsset]:
"""Get all upstream dependencies."""
lineage = []
visited = set()
def traverse(aid: str, current_depth: int):
if aid in visited or current_depth > depth:
return
visited.add(aid)
if aid in self.assets:
asset = self.assets[aid]
lineage.append(asset)
for up_id in asset.upstream:
traverse(up_id, current_depth + 1)
traverse(asset_id, 0)
return lineage
def get_downstream_impact(self, asset_id: str) -> list[DataAsset]:
"""Get all downstream assets that would be affected."""
impact = []
visited = set()
def traverse(aid: str):
if aid in visited:
return
visited.add(aid)
if aid in self.assets:
asset = self.assets[aid]
if aid != asset_id: # Don't include the source
impact.append(asset)
for down_id in asset.downstream:
traverse(down_id)
traverse(asset_id)
return impact
def model_training_lineage(self, model_id: str) -> dict:
"""Get complete lineage for a trained model."""
model = self.assets.get(model_id)
if not model:
return {'error': 'Model not found'}
upstream = self.get_upstream_lineage(model_id)
return {
'model': model,
'training_data': [a for a in upstream if a.type == 'dataset'],
'features': [a for a in upstream if a.type == 'feature_view'],
'raw_data': [a for a in upstream if a.type == 'raw_table'],
'full_lineage': upstream
}Data Version Manager
Complete implementation for managing versioned data:
from datetime import datetime
class DataVersionManager:
"""Manage versioned data for ML reproducibility."""
def __init__(self, storage_backend):
self.storage = storage_backend
self.versions: dict[str, list[dict]] = {}
def create_version(
self,
dataset_name: str,
data,
metadata: dict = None
) -> str:
"""Create a new version of a dataset."""
import hashlib
import json
# Compute content hash for deduplication
content_hash = self._compute_hash(data)
# Check if identical version exists
if dataset_name in self.versions:
for v in self.versions[dataset_name]:
if v['content_hash'] == content_hash:
return v['version_id'] # Return existing version
# Create new version
version_id = f"v{len(self.versions.get(dataset_name, []))+1}_{content_hash[:8]}"
version_info = {
'version_id': version_id,
'content_hash': content_hash,
'created_at': datetime.now().isoformat(),
'row_count': len(data) if hasattr(data, '__len__') else None,
'metadata': metadata or {}
}
# Store data
self.storage.put(
key=f"{dataset_name}/{version_id}",
data=data
)
# Record version
if dataset_name not in self.versions:
self.versions[dataset_name] = []
self.versions[dataset_name].append(version_info)
return version_id
def get_version(
self,
dataset_name: str,
version_id: str = None
):
"""Get specific version or latest."""
if version_id is None:
# Get latest
if dataset_name not in self.versions:
return None
version_id = self.versions[dataset_name][-1]['version_id']
return self.storage.get(f"{dataset_name}/{version_id}")
def diff_versions(
self,
dataset_name: str,
version_a: str,
version_b: str
) -> dict:
"""Compare two versions of a dataset."""
data_a = self.get_version(dataset_name, version_a)
data_b = self.get_version(dataset_name, version_b)
# Simple diff metrics
return {
'version_a': version_a,
'version_b': version_b,
'rows_a': len(data_a) if hasattr(data_a, '__len__') else 0,
'rows_b': len(data_b) if hasattr(data_b, '__len__') else 0,
'row_difference': len(data_b) - len(data_a) if hasattr(data_a, '__len__') else None,
# Additional diff metrics would go here
}
def _compute_hash(self, data) -> str:
"""Compute hash of data for versioning."""
import hashlib
import pickle
content = pickle.dumps(data)
return hashlib.sha256(content).hexdigest()
def list_versions(self, dataset_name: str) -> list[dict]:
"""List all versions of a dataset."""
return self.versions.get(dataset_name, [])
def pin_version_to_experiment(
self,
dataset_name: str,
version_id: str,
experiment_id: str
):
"""Pin a data version to an experiment for reproducibility."""
# Would integrate with experiment tracking system
passData Observability
Data Observability Monitor
Complete implementation for monitoring data observability across pipelines:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from collections import defaultdict
@dataclass
class DataObservabilityMetric:
"""A data observability metric."""
name: str
value: float
threshold: float
status: str # ok, warning, critical
timestamp: datetime
metadata: dict
class DataObservabilityMonitor:
"""Monitor data observability across pipelines."""
def __init__(self):
self.metrics: dict[str, list[DataObservabilityMetric]] = defaultdict(list)
self.expectations: dict[str, dict] = {}
def set_expectations(
self,
table_name: str,
expectations: dict
):
"""Set expectations for a table."""
self.expectations[table_name] = expectations
def check_freshness(
self,
table_name: str,
last_update: datetime
) -> DataObservabilityMetric:
"""Check data freshness."""
expectations = self.expectations.get(table_name, {})
max_age_hours = expectations.get('max_age_hours', 24)
age_hours = (datetime.now() - last_update).total_seconds() / 3600
if age_hours > max_age_hours * 2:
status = 'critical'
elif age_hours > max_age_hours:
status = 'warning'
else:
status = 'ok'
metric = DataObservabilityMetric(
name='freshness',
value=age_hours,
threshold=max_age_hours,
status=status,
timestamp=datetime.now(),
metadata={'last_update': last_update.isoformat()}
)
self.metrics[f'{table_name}.freshness'].append(metric)
return metric
def check_volume(
self,
table_name: str,
current_count: int
) -> DataObservabilityMetric:
"""Check data volume against expectations."""
expectations = self.expectations.get(table_name, {})
min_count = expectations.get('min_row_count', 0)
max_count = expectations.get('max_row_count', float('inf'))
# Also check against historical
historical = self.metrics.get(f'{table_name}.volume', [])
if historical:
recent_counts = [m.value for m in historical[-7:]]
avg_count = sum(recent_counts) / len(recent_counts)
deviation = abs(current_count - avg_count) / (avg_count + 1)
else:
deviation = 0
if current_count < min_count or current_count > max_count:
status = 'critical'
elif deviation > 0.5: # More than 50% deviation
status = 'warning'
else:
status = 'ok'
metric = DataObservabilityMetric(
name='volume',
value=current_count,
threshold=min_count,
status=status,
timestamp=datetime.now(),
metadata={'deviation_from_avg': deviation}
)
self.metrics[f'{table_name}.volume'].append(metric)
return metric
def check_schema(
self,
table_name: str,
current_schema: dict,
expected_schema: dict
) -> DataObservabilityMetric:
"""Check for unexpected schema changes."""
differences = []
# Check for missing fields
for field in expected_schema.get('fields', []):
if field not in current_schema.get('fields', []):
differences.append(f"Missing field: {field}")
# Check for new fields
for field in current_schema.get('fields', []):
if field not in expected_schema.get('fields', []):
differences.append(f"New field: {field}")
if len(differences) > 3:
status = 'critical'
elif len(differences) > 0:
status = 'warning'
else:
status = 'ok'
metric = DataObservabilityMetric(
name='schema',
value=len(differences),
threshold=0,
status=status,
timestamp=datetime.now(),
metadata={'differences': differences}
)
self.metrics[f'{table_name}.schema'].append(metric)
return metric
def check_distribution(
self,
table_name: str,
column_name: str,
current_stats: dict,
baseline_stats: dict
) -> DataObservabilityMetric:
"""Check for distribution drift."""
# Compute drift metrics
mean_drift = abs(
current_stats.get('mean', 0) - baseline_stats.get('mean', 0)
) / (baseline_stats.get('std', 1) + 1e-6)
null_rate_change = abs(
current_stats.get('null_rate', 0) -
baseline_stats.get('null_rate', 0)
)
if mean_drift > 3 or null_rate_change > 0.1:
status = 'critical'
elif mean_drift > 2 or null_rate_change > 0.05:
status = 'warning'
else:
status = 'ok'
metric = DataObservabilityMetric(
name='distribution',
value=mean_drift,
threshold=2.0,
status=status,
timestamp=datetime.now(),
metadata={
'column': column_name,
'mean_drift': mean_drift,
'null_rate_change': null_rate_change
}
)
self.metrics[f'{table_name}.{column_name}.distribution'].append(metric)
return metric
def get_health_summary(self, table_name: str) -> dict:
"""Get overall health summary for a table."""
relevant_metrics = {
k: v for k, v in self.metrics.items()
if k.startswith(table_name)
}
latest_metrics = {}
for key, metric_list in relevant_metrics.items():
if metric_list:
latest_metrics[key] = metric_list[-1]
statuses = [m.status for m in latest_metrics.values()]
if 'critical' in statuses:
overall_status = 'critical'
elif 'warning' in statuses:
overall_status = 'warning'
else:
overall_status = 'ok'
return {
'table': table_name,
'overall_status': overall_status,
'metrics': {k: vars(v) for k, v in latest_metrics.items()},
'issues': [
k for k, v in latest_metrics.items()
if v.status != 'ok'
]
}Data Anomaly Detector
Complete implementation for detecting anomalies in data metrics:
class DataAnomalyDetector:
"""Detect anomalies in data metrics."""
def __init__(self, sensitivity: float = 2.0):
self.sensitivity = sensitivity
self.baselines: dict[str, dict] = {}
def fit_baseline(
self,
metric_name: str,
historical_values: list[float]
):
"""Fit baseline from historical values."""
if len(historical_values) < 7:
raise ValueError("Need at least 7 historical values")
mean = sum(historical_values) / len(historical_values)
std = (sum((x - mean) ** 2 for x in historical_values) / len(historical_values)) ** 0.5
self.baselines[metric_name] = {
'mean': mean,
'std': std,
'min': min(historical_values),
'max': max(historical_values)
}
def detect_anomaly(
self,
metric_name: str,
value: float
) -> dict:
"""Detect if value is anomalous."""
if metric_name not in self.baselines:
return {'is_anomaly': False, 'reason': 'No baseline'}
baseline = self.baselines[metric_name]
z_score = abs(value - baseline['mean']) / (baseline['std'] + 1e-6)
is_anomaly = z_score > self.sensitivity
return {
'is_anomaly': is_anomaly,
'value': value,
'z_score': z_score,
'baseline_mean': baseline['mean'],
'baseline_std': baseline['std'],
'threshold': self.sensitivity,
'severity': self._severity(z_score)
}
def _severity(self, z_score: float) -> str:
if z_score > 4:
return 'critical'
elif z_score > 3:
return 'high'
elif z_score > 2:
return 'medium'
else:
return 'low'Data Quality
Data Quality Monitor
Complete implementation for monitoring data quality:
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
@dataclass
class DataQualityCheck:
"""A data quality check."""
name: str
dimension: str
check_func: Callable
severity: str # critical, warning, info
threshold: float
class DataQualityMonitor:
"""Monitor data quality."""
def __init__(self, checks: list[DataQualityCheck]):
self.checks = checks
self.results_history = []
def run_checks(self, data) -> dict:
"""Run all quality checks on data."""
results = {
'timestamp': datetime.now(),
'checks': [],
'passed': True,
'critical_failures': []
}
for check in self.checks:
try:
result = check.check_func(data)
passed = result >= check.threshold
check_result = {
'name': check.name,
'dimension': check.dimension,
'value': result,
'threshold': check.threshold,
'passed': passed
}
results['checks'].append(check_result)
if not passed and check.severity == 'critical':
results['passed'] = False
results['critical_failures'].append(check.name)
except Exception as e:
results['checks'].append({
'name': check.name,
'error': str(e),
'passed': False
})
if check.severity == 'critical':
results['passed'] = False
results['critical_failures'].append(check.name)
self.results_history.append(results)
return results
def trend_analysis(self, check_name: str, window: int = 7) -> dict:
"""Analyze trends for a specific check."""
recent = [
r for r in self.results_history[-window:]
for c in r['checks']
if c.get('name') == check_name and 'value' in c
]
if not recent:
return {'error': 'No data for check'}
values = [c['value'] for c in recent]
return {
'check': check_name,
'values': values,
'trend': 'improving' if values[-1] > values[0] else 'degrading',
'mean': sum(values) / len(values)
}Feature Governance
Feature Governance System
Complete implementation for managing feature governance:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class FeatureGovernance:
"""Governance model for feature management."""
feature_name: str
owner: str
team: str
description: str
documentation_url: str
status: str # active, deprecated, experimental
consumers: list[str] # Models/services using this feature
sla: dict # Freshness, availability requirements
created_date: datetime
last_reviewed: datetime
deprecation_date: Optional[datetime] = None
class FeatureGovernanceSystem:
"""Manage feature governance."""
def __init__(self):
self.features: dict[str, FeatureGovernance] = {}
def register_feature(self, governance: FeatureGovernance):
"""Register feature with governance metadata."""
# Require minimum documentation
if not governance.description or len(governance.description) < 50:
raise ValueError("Feature description must be at least 50 characters")
if not governance.owner:
raise ValueError("Feature must have an owner")
self.features[governance.feature_name] = governance
def deprecate_feature(
self,
feature_name: str,
deprecation_date: datetime,
migration_guide: str
) -> dict:
"""Deprecate a feature with proper notification."""
feature = self.features.get(feature_name)
if not feature:
return {'error': 'Feature not found'}
# Notify consumers
notifications = []
for consumer in feature.consumers:
notifications.append({
'consumer': consumer,
'message': f"Feature {feature_name} will be deprecated on {deprecation_date}",
'migration_guide': migration_guide
})
feature.status = 'deprecated'
feature.deprecation_date = deprecation_date
return {
'feature': feature_name,
'notifications_sent': len(notifications),
'consumers_affected': feature.consumers
}
def audit_features(self) -> dict:
"""Audit feature health."""
issues = []
for name, feature in self.features.items():
# Check for stale features
if (datetime.now() - feature.last_reviewed).days > 90:
issues.append({
'feature': name,
'issue': 'Not reviewed in 90+ days',
'severity': 'warning'
})
# Check for ownerless features
if not feature.owner:
issues.append({
'feature': name,
'issue': 'No owner assigned',
'severity': 'critical'
})
# Check for undocumented consumers
if feature.status == 'active' and not feature.consumers:
issues.append({
'feature': name,
'issue': 'No known consumers',
'severity': 'warning'
})
return {
'total_features': len(self.features),
'issues': issues,
'issue_count': len(issues)
}Common Pitfalls Solutions
Freshness Monitor
Complete implementation for monitoring feature freshness:
from datetime import datetime
class FreshnessMonitor:
"""Monitor feature freshness."""
def __init__(self, alerting_system):
self.alerting = alerting_system
self.freshness_slas: dict[str, int] = {} # feature -> max_age_seconds
self.last_updates: dict[str, datetime] = {}
def set_sla(self, feature_name: str, max_age_seconds: int):
"""Set freshness SLA for a feature."""
self.freshness_slas[feature_name] = max_age_seconds
def record_update(self, feature_name: str, entity_key: str):
"""Record that a feature was updated."""
key = f"{feature_name}:{entity_key}"
self.last_updates[key] = datetime.now()
def check_freshness_at_serving(
self,
feature_name: str,
entity_key: str,
feature_timestamp: datetime
) -> dict:
"""Check freshness when serving a prediction."""
sla = self.freshness_slas.get(feature_name, 3600) # Default 1 hour
age_seconds = (datetime.now() - feature_timestamp).total_seconds()
is_stale = age_seconds > sla
result = {
'feature': feature_name,
'entity': entity_key,
'age_seconds': age_seconds,
'sla_seconds': sla,
'is_stale': is_stale
}
if is_stale:
self.alerting.send_alert(
severity='warning',
message=f"Stale feature: {feature_name} is {age_seconds}s old (SLA: {sla}s)",
metadata=result
)
return resultLeakage Detector
Complete implementation for detecting data leakage in training data:
class LeakageDetector:
"""Detect data leakage in training data."""
def check_timestamp_ordering(
self,
training_data,
feature_timestamp_col: str,
label_timestamp_col: str
) -> dict:
"""Verify features don't use future information."""
violations = training_data[
training_data[feature_timestamp_col] > training_data[label_timestamp_col]
]
return {
'total_rows': len(training_data),
'violations': len(violations),
'violation_rate': len(violations) / len(training_data),
'sample_violations': violations.head(5).to_dict() if len(violations) > 0 else {},
'has_leakage': len(violations) > 0
}
def check_feature_label_correlation(
self,
features,
labels,
threshold: float = 0.95
) -> dict:
"""Check for suspicious correlations that might indicate leakage."""
suspicious = []
for col in features.columns:
# Simple correlation check
if features[col].dtype in ['int64', 'float64']:
corr = features[col].corr(labels)
if abs(corr) > threshold:
suspicious.append({
'feature': col,
'correlation': corr,
'warning': 'Suspiciously high correlation - possible leakage'
})
return {
'features_checked': len(features.columns),
'suspicious_features': suspicious
}
def validate_aggregation_windows(
self,
feature_definition: dict,
prediction_time_col: str
) -> dict:
"""Validate aggregation windows don't include future data."""
issues = []
window_end = feature_definition.get('window_end')
if window_end and window_end != 'prediction_time':
issues.append(
f"Window end should be prediction_time, not {window_end}"
)
# Check for inclusive vs exclusive bounds
if feature_definition.get('window_inclusive', True):
issues.append(
"Window should exclude prediction_time event itself"
)
return {
'feature': feature_definition.get('name'),
'issues': issues,
'valid': len(issues) == 0
}Real-Time Quality Gate
Complete implementation for lightweight quality checks in real-time pipelines:
from datetime import datetime
class RealTimeQualityGate:
"""Lightweight quality checks for real-time pipelines."""
def __init__(self, strict_mode: bool = False):
self.strict_mode = strict_mode
self.deferred_checks = []
self.type_expectations = {}
self.required_fields = []
self.range_checks = {}
def quick_validate(self, record: dict) -> dict:
"""Fast validation for real-time path (< 1ms)."""
issues = []
# Type checks (fast)
for field, expected_type in self.type_expectations.items():
if field in record and not isinstance(record[field], expected_type):
issues.append(f"Type mismatch: {field}")
# Null checks (fast)
for required_field in self.required_fields:
if required_field not in record or record[required_field] is None:
issues.append(f"Missing required: {required_field}")
# Range checks (fast)
for field, (min_val, max_val) in self.range_checks.items():
if field in record:
val = record[field]
if val < min_val or val > max_val:
issues.append(f"Out of range: {field}")
valid = len(issues) == 0
if not valid:
# Queue for async detailed analysis
self.deferred_checks.append({
'record': record,
'issues': issues,
'timestamp': datetime.now()
})
return {
'valid': valid or not self.strict_mode,
'issues': issues,
'action': 'reject' if self.strict_mode and not valid else 'accept_with_warning'
}