Python for AI Engineering - Code Reference
Complete code examples from Chapter 2: Python for AI Engineering.
Table of Contents
- Type Hints and TypedDict
- Pydantic Models
- Async/Await Patterns
- Context Managers
- HTTP Clients (httpx)
- LLM SDK Patterns
- NumPy for Embeddings
- Pandas Data Manipulation
- JSON Handling
- Environment Management
- Streaming Patterns
- Retry Logic
- Rate Limiting
- Logging and Observability
- Error Handling
- Testing Patterns
Type Hints and TypedDict
Basic Type Hints for API Responses
from typing import TypedDict
class ChatMessage(TypedDict):
role: str
content: str
class ChatChoice(TypedDict):
message: ChatMessage
index: int
class ChatResponse(TypedDict):
choices: list[ChatChoice]
model: str
usage: dict[str, int]
class ProcessingConfig(TypedDict, total=False):
raw: bool
strip_whitespace: bool
def process_response(response: ChatResponse, config: ProcessingConfig) -> str:
if config.get('raw'):
return response['choices'][0]['message']['content']
return clean(response)Pydantic Models for LLM Responses
from typing import Literal, Optional
from pydantic import BaseModel
class Message(BaseModel):
role: Literal["user", "assistant", "system"]
content: str
class CompletionUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class CompletionChoice(BaseModel):
index: int
message: Message
finish_reason: Literal["stop", "length", "content_filter"]
class ChatCompletion(BaseModel):
id: str
model: str
choices: list[CompletionChoice]
usage: CompletionUsagePydantic Models
Request/Response Validation
from pydantic import BaseModel, Field, field_validator
from typing import Literal
class ChatMessage(BaseModel):
role: Literal["user", "assistant", "system"]
content: str = Field(..., min_length=1, max_length=100000)
@field_validator('content')
@classmethod
def strip_whitespace(cls, v: str) -> str:
return v.strip()
class ChatRequest(BaseModel):
messages: list[ChatMessage]
model: str = "gpt-4"
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: int = Field(default=1000, gt=0)
model_config = {"extra": "forbid"} # Reject unexpected fields
# Automatic validation {.unnumbered}
try:
request = ChatRequest(
messages=[{"role": "user", "content": "Hello"}],
temperature=2.5 # Invalid: > 2
)
except ValidationError as e:
print(e) # temperature: Input should be less than or equal to 2
# Parse from JSON {.unnumbered}
request = ChatRequest.model_validate_json(json_string)
# Serialize to JSON {.unnumbered}
json_string = request.model_dump_json()LLM Configuration with Validation
from pydantic import BaseModel, Field, model_validator
from pathlib import Path
import yaml
class LLMConfig(BaseModel):
provider: Literal["openai", "anthropic", "local"]
model: str
api_key: str = Field(..., exclude=True) # Don't serialize
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: int = Field(default=2000, gt=0)
timeout_seconds: float = Field(default=60, gt=0)
@model_validator(mode='after')
def validate_model_for_provider(self) -> 'LLMConfig':
valid_models = {
"openai": ["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo"],
"anthropic": ["claude-3-opus", "claude-3-sonnet", "claude-3-haiku"],
"local": ["llama-2-7b", "mistral-7b"]
}
if self.model not in valid_models.get(self.provider, []):
raise ValueError(f"Model {self.model} not valid for {self.provider}")
return self
@classmethod
def from_yaml(cls, path: Path) -> 'LLMConfig':
with open(path) as f:
data = yaml.safe_load(f)
return cls(**data)Structured Output Models
from pydantic import BaseModel, Field
from typing import Literal
class ExtractedEntity(BaseModel):
name: str
entity_type: Literal["person", "organization", "location", "product"]
confidence: float = Field(ge=0, le=1)
class ExtractionResult(BaseModel):
entities: list[ExtractedEntity]
summary: str
# Prompt the LLM to output matching JSON {.unnumbered}
prompt = f"""Extract entities from this text. Respond with JSON matching this schema:
{ExtractionResult.model_json_schema()}
Text: {user_text}
"""
response = await llm.complete(prompt)
result = safe_parse_llm_json(response, ExtractionResult)Async/Await Patterns
Pattern 1: Concurrent API Calls with gather
import asyncio
import httpx
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
async def fetch_all_embeddings(texts: list[str]) -> list[list[float]]:
async with httpx.AsyncClient() as client:
tasks = [fetch_embedding(client, text) for text in texts]
return await asyncio.gather(*tasks)Pattern 2: Controlled Concurrency with Semaphore
async def fetch_with_limit(texts: list[str], max_concurrent: int = 5) -> list[list[float]]:
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(text: str) -> list[float]:
async with semaphore: # At most max_concurrent run simultaneously
return await fetch_embedding(text)
async with httpx.AsyncClient() as client:
return await asyncio.gather(*[fetch_one(t) for t in texts])Pattern 3: Timeout Handling
async def fetch_with_timeout(text: str, timeout: float = 30.0) -> list[float]:
try:
return await asyncio.wait_for(fetch_embedding(text), timeout=timeout)
except asyncio.TimeoutError:
raise EmbeddingError(f"Embedding timed out after {timeout}s")Pattern 4: First Successful Result
async def fetch_from_any_provider(text: str, providers: list[str]) -> list[float]:
tasks = [fetch_from_provider(text, p) for p in providers]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Cancel remaining tasks
for task in pending:
task.cancel()
return done.pop().result()Bridging Sync and Async
# Running async code from sync context {.unnumbered}
def sync_embed(text: str) -> list[float]:
return asyncio.run(async_embed(text))
# Running sync code from async context (for CPU-bound work) {.unnumbered}
async def async_process(data: bytes) -> dict:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, cpu_intensive_process, data)Context Managers
Sync Context Manager for Timing
from contextlib import contextmanager, asynccontextmanager
from typing import Generator, AsyncGenerator
import time
@contextmanager
def timer(operation: str) -> Generator[None, None, None]:
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{operation} took {elapsed:.2f}s")
# Usage {.unnumbered}
with timer("Embedding generation"):
embeddings = embed_documents(texts)Async Context Manager for API Client Lifecycle
@asynccontextmanager
async def managed_llm_client() -> AsyncGenerator[LLMClient, None]:
client = LLMClient(api_key=get_api_key())
try:
await client.connect()
yield client
finally:
await client.disconnect()
# Usage {.unnumbered}
async with managed_llm_client() as client:
response = await client.complete(prompt)AI-Specific Resource Management
@contextmanager
def temp_model_cache(max_size_gb: float = 10.0):
"""Temporary cache directory for model weights."""
cache_dir = Path(tempfile.mkdtemp())
try:
yield cache_dir
finally:
shutil.rmtree(cache_dir)
@asynccontextmanager
async def rate_limited_client(requests_per_second: float):
"""HTTP client with built-in rate limiting."""
limiter = asyncio.Semaphore(int(requests_per_second))
client = httpx.AsyncClient()
async def limited_request(*args, **kwargs):
async with limiter:
return await client.request(*args, **kwargs)
client.limited_request = limited_request
try:
yield client
finally:
await client.aclose()HTTP Clients (httpx)
Production Configuration
from httpx import AsyncClient, Timeout, Limits
# Sensible defaults for AI workloads {.unnumbered}
client = AsyncClient(
timeout=Timeout(
connect=5.0, # Time to establish connection
read=60.0, # Time to read response (LLMs can be slow)
write=10.0, # Time to send request
pool=5.0 # Time to acquire connection from pool
),
limits=Limits(
max_connections=100, # Total connection pool size
max_keepalive_connections=20, # Persistent connections
keepalive_expiry=30.0 # How long to keep idle connections
),
http2=True # HTTP/2 for multiplexing
)Streaming Responses
async def stream_completion(prompt: str) -> AsyncGenerator[str, None]:
"""Stream tokens from an LLM API."""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}],
"stream": True
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
data = json.loads(line[6:])
if content := data["choices"][0]["delta"].get("content"):
yield contentLLM SDK Patterns
OpenAI SDK
from openai import OpenAI, AsyncOpenAI
# Synchronous client {.unnumbered}
client = OpenAI() # Reads OPENAI_API_KEY from environment
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing briefly."}
],
temperature=0.7,
max_tokens=500
)
print(response.choices[0].message.content)
# Async client {.unnumbered}
async_client = AsyncOpenAI()
async def complete(prompt: str) -> str:
response = await async_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Streaming {.unnumbered}
async def stream_response(prompt: str) -> AsyncGenerator[str, None]:
stream = await async_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.contentAnthropic SDK
from anthropic import Anthropic, AsyncAnthropic
client = Anthropic() # Reads ANTHROPIC_API_KEY from environment
message = client.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain quantum computing briefly."}
]
)
print(message.content[0].text)
# Streaming with context manager {.unnumbered}
async_client = AsyncAnthropic()
async def stream_claude(prompt: str) -> AsyncGenerator[str, None]:
async with async_client.messages.stream(
model="claude-3-sonnet-20240229",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield textProvider-Agnostic Abstraction
from abc import ABC, abstractmethod
from typing import AsyncGenerator
class LLMProvider(ABC):
@abstractmethod
async def complete(self, prompt: str, **kwargs) -> str:
pass
@abstractmethod
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
pass
class OpenAIProvider(LLMProvider):
def __init__(self, model: str = "gpt-4"):
self.client = AsyncOpenAI()
self.model = model
async def complete(self, prompt: str, **kwargs) -> str:
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
return response.choices[0].message.content
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
stream = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
stream=True,
**kwargs
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
class AnthropicProvider(LLMProvider):
def __init__(self, model: str = "claude-3-sonnet-20240229"):
self.client = AsyncAnthropic()
self.model = model
async def complete(self, prompt: str, **kwargs) -> str:
response = await self.client.messages.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=kwargs.get("max_tokens", 1024)
)
return response.content[0].text
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
async with self.client.messages.stream(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=kwargs.get("max_tokens", 1024)
) as stream:
async for text in stream.text_stream:
yield textNumPy for Embeddings
Basic Operations
import numpy as np
# Embeddings are just arrays of floats {.unnumbered}
embedding = np.array([0.1, -0.3, 0.5, ...]) # Typically 384-4096 dimensions
# Cosine similarity: the standard metric for embedding comparison {.unnumbered}
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# For normalized vectors (unit length), dot product = cosine similarity {.unnumbered}
def normalize(v: np.ndarray) -> np.ndarray:
return v / np.linalg.norm(v)
# Batch normalization {.unnumbered}
embeddings = np.array([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]])
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / normsEfficient Similarity Search
def find_most_similar(
query_embedding: np.ndarray,
document_embeddings: np.ndarray,
top_k: int = 5
) -> list[tuple[int, float]]:
"""Find top-k most similar documents to query."""
# Assumes embeddings are normalized
# Matrix multiplication computes all similarities at once
similarities = document_embeddings @ query_embedding
# Get indices of top-k
top_indices = np.argsort(similarities)[-top_k:][::-1]
return [(int(i), float(similarities[i])) for i in top_indices]
# Usage {.unnumbered}
query = normalize(get_embedding("What is machine learning?"))
docs = np.array([normalize(e) for e in document_embeddings])
results = find_most_similar(query, docs, top_k=10)Memory-Efficient Operations
# Use float32 instead of float64 (half the memory, sufficient precision) {.unnumbered}
embeddings = embeddings.astype(np.float32)
# Memory-mapped arrays for datasets that don't fit in RAM {.unnumbered}
embeddings = np.memmap('embeddings.npy', dtype='float32', mode='r', shape=(1000000, 1536))
# Batch processing to control memory {.unnumbered}
def process_in_batches(
embeddings: np.ndarray,
batch_size: int = 10000
) -> Generator[np.ndarray, None, None]:
for i in range(0, len(embeddings), batch_size):
yield embeddings[i:i + batch_size]Pandas Data Manipulation
Common AI Data Tasks
import pandas as pd
# Loading evaluation datasets {.unnumbered}
eval_df = pd.read_json("evaluation_set.jsonl", lines=True)
# Filtering by criteria {.unnumbered}
hard_examples = eval_df[eval_df["difficulty"] == "hard"]
# Adding computed columns {.unnumbered}
eval_df["response_length"] = eval_df["response"].str.len()
eval_df["is_correct"] = eval_df["predicted"] == eval_df["expected"]
# Aggregating results {.unnumbered}
results = eval_df.groupby("category").agg({
"is_correct": "mean", # Accuracy per category
"latency_ms": ["mean", "p50", "p95"]
}).round(3)
# Joining datasets {.unnumbered}
prompts_df = pd.read_csv("prompts.csv")
responses_df = pd.read_csv("responses.csv")
merged = prompts_df.merge(responses_df, on="prompt_id")Preparing Fine-Tuning Data
def prepare_training_data(df: pd.DataFrame) -> pd.DataFrame:
"""Convert raw data to fine-tuning format."""
# Filter for quality
df = df[df["quality_score"] >= 0.8]
# Format as conversation
df["messages"] = df.apply(
lambda row: [
{"role": "system", "content": row["system_prompt"]},
{"role": "user", "content": row["user_input"]},
{"role": "assistant", "content": row["expected_output"]}
],
axis=1
)
# Train/validation split
df["split"] = np.random.choice(
["train", "val"],
size=len(df),
p=[0.9, 0.1]
)
return df[["messages", "split"]]
# Export for training {.unnumbered}
training_df = prepare_training_data(raw_df)
training_df[training_df["split"] == "train"]["messages"].to_json(
"train.jsonl", orient="records", lines=True
)JSON Handling
Parsing LLM JSON Output
import json
import re
from typing import Any
def extract_json(text: str) -> dict[str, Any]:
"""Extract JSON from LLM response that might include markdown or prose."""
# Try direct parsing first
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try extracting from markdown code block
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Try finding JSON object in text
json_match = re.search(r'\{[\s\S]*\}', text)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
raise ValueError(f"Could not extract valid JSON from: {text[:200]}...")
def safe_parse_llm_json(text: str, schema: type[BaseModel]) -> BaseModel:
"""Parse LLM JSON output with validation."""
raw = extract_json(text)
return schema.model_validate(raw)Environment Management
Environment Variables with python-dotenv
# .env file (NEVER commit this) {.unnumbered}
# OPENAI_API_KEY=sk-... {.unnumbered}
# ANTHROPIC_API_KEY=sk-ant-... {.unnumbered}
# DATABASE_URL=postgresql://... {.unnumbered}
# Load at application start {.unnumbered}
from dotenv import load_dotenv
import os
load_dotenv() # Loads .env into environment
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
raise ValueError("OPENAI_API_KEY environment variable not set")Secrets Managers for Production
# AWS Secrets Manager {.unnumbered}
import boto3
import json
def get_secret(secret_name: str) -> dict:
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
# Usage {.unnumbered}
secrets = get_secret("prod/ai-assistant")
openai_key = secrets["OPENAI_API_KEY"]
# Google Cloud Secret Manager {.unnumbered}
from google.cloud import secretmanager
def get_gcp_secret(project_id: str, secret_id: str, version: str = "latest") -> str:
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")pyproject.toml Configuration
[tool.poetry]
name = "ai-assistant"
version = "0.1.0"
description = "AI-powered assistant application"
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.11"
openai = "^1.12"
anthropic = "^0.5"
pydantic = "^2.0"
httpx = "^0.26"
[tool.poetry.group.dev.dependencies]
pytest = "^8.0"
pytest-asyncio = "^0.23"
mypy = "^1.8"
ruff = "^0.2"
[tool.mypy]
python_version = "3.11"
strict = true
[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "N", "W", "UP"]Streaming Patterns
Generator Basics
# Sync generator {.unnumbered}
def count_up(n: int) -> Generator[int, None, None]:
for i in range(n):
yield i
# Values are produced lazily, one at a time {.unnumbered}
for num in count_up(5):
print(num) # Prints 0, 1, 2, 3, 4
# Async generator {.unnumbered}
async def stream_numbers(n: int) -> AsyncGenerator[int, None]:
for i in range(n):
await asyncio.sleep(0.1) # Simulate async work
yield i
# Consume with async for {.unnumbered}
async for num in stream_numbers(5):
print(num)Streaming LLM Responses
async def stream_chat_response(
messages: list[dict],
on_token: Callable[[str], None] | None = None
) -> str:
"""Stream a chat completion, calling on_token for each chunk."""
full_response = []
stream = await client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
async for chunk in stream:
if content := chunk.choices[0].delta.content:
full_response.append(content)
if on_token:
on_token(content)
return "".join(full_response)
# Usage with print callback {.unnumbered}
response = await stream_chat_response(
messages=[{"role": "user", "content": "Write a poem"}],
on_token=lambda t: print(t, end="", flush=True)
)Server-Sent Events (SSE) for Web
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate_stream(prompt: str) -> AsyncGenerator[str, None]:
"""Generate SSE-formatted stream."""
async for token in stream_llm_response(prompt):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
generate_stream(request.prompt),
media_type="text/event-stream"
)Retry Logic
Using Tenacity Library
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
async def call_llm_with_retry(prompt: str) -> str:
"""Call LLM API with automatic retry on transient failures."""
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.contentCustom Retry Logic
import asyncio
from typing import TypeVar, Callable, Awaitable
import random
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], Awaitable[T]],
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
retryable_exceptions: tuple = (Exception,)
) -> T:
"""Execute async function with exponential backoff retry."""
last_exception = None
for attempt in range(max_attempts):
try:
return await func()
except retryable_exceptions as e:
last_exception = e
if attempt == max_attempts - 1:
raise
# Exponential backoff: 1s, 2s, 4s, 8s...
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
logger.warning(
f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s"
)
await asyncio.sleep(delay)
raise last_exception # Should never reach here
# Usage {.unnumbered}
result = await retry_with_backoff(
lambda: client.embeddings.create(input=text, model="text-embedding-3-small"),
max_attempts=5,
retryable_exceptions=(httpx.TimeoutException, RateLimitError)
)Rate Limiting
Token Bucket Rate Limiter
import asyncio
import time
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, requests_per_minute: int):
self.requests_per_minute = requests_per_minute
self.tokens = requests_per_minute
self.last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Wait until a request token is available."""
async with self._lock:
while True:
now = time.monotonic()
elapsed = now - self.last_update
# Refill tokens based on elapsed time
self.tokens = min(
self.requests_per_minute,
self.tokens + elapsed * (self.requests_per_minute / 60)
)
self.last_update = now
if self.tokens >= 1:
self.tokens -= 1
return
# Wait for a token to become available
wait_time = (1 - self.tokens) * (60 / self.requests_per_minute)
await asyncio.sleep(wait_time)
# Usage {.unnumbered}
rate_limiter = RateLimiter(requests_per_minute=60)
async def rate_limited_call(prompt: str) -> str:
await rate_limiter.acquire()
return await client.chat.completions.create(...)Combined Rate and Concurrency Limiting
class ConcurrencyLimiter:
"""Limit concurrent in-flight requests."""
def __init__(self, max_concurrent: int):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def __aenter__(self):
await self.semaphore.acquire()
return self
async def __aexit__(self, *args):
self.semaphore.release()
class APIThrottler:
def __init__(
self,
requests_per_minute: int = 60,
max_concurrent: int = 10
):
self.rate_limiter = RateLimiter(requests_per_minute)
self.concurrency_limiter = ConcurrencyLimiter(max_concurrent)
async def throttled_call(self, func: Callable[[], Awaitable[T]]) -> T:
await self.rate_limiter.acquire()
async with self.concurrency_limiter:
return await func()Logging and Observability
Structured Logging Setup
import logging
import json
from datetime import datetime
from typing import Any
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for structured logging systems."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
# Include extra fields
if hasattr(record, "extra"):
log_data.update(record.extra)
return json.dumps(log_data)
def setup_logging(level: str = "INFO") -> None:
"""Configure application logging."""
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(
level=getattr(logging, level),
handlers=[handler]
)
# Usage with extra context {.unnumbered}
logger = logging.getLogger(__name__)
def log_llm_call(
prompt: str,
response: str,
model: str,
latency_ms: float,
tokens_used: int
) -> None:
logger.info(
"LLM call completed",
extra={
"prompt_preview": prompt[:100],
"response_preview": response[:100],
"model": model,
"latency_ms": latency_ms,
"tokens_used": tokens_used,
"event_type": "llm_call"
}
)Request Tracing
import uuid
from contextvars import ContextVar
# Request ID propagation {.unnumbered}
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")
class TracingMiddleware:
"""Middleware to add request tracing."""
async def __call__(self, request, call_next):
request_id = str(uuid.uuid4())[:8]
request_id_var.set(request_id)
logger.info(
"Request started",
extra={"request_id": request_id, "path": request.url.path}
)
start = time.perf_counter()
response = await call_next(request)
elapsed = (time.perf_counter() - start) * 1000
logger.info(
"Request completed",
extra={
"request_id": request_id,
"status_code": response.status_code,
"latency_ms": elapsed
}
)
response.headers["X-Request-ID"] = request_id
return responseError Handling
Comprehensive API Error Handling
from openai import (
APIError,
APIConnectionError,
RateLimitError,
AuthenticationError,
BadRequestError
)
class LLMError(Exception):
"""Base exception for LLM operations."""
pass
class RetryableError(LLMError):
"""Error that may succeed on retry."""
pass
class NonRetryableError(LLMError):
"""Error that won't be fixed by retrying."""
pass
async def call_llm_safely(prompt: str) -> str:
"""Call LLM with comprehensive error handling."""
try:
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except AuthenticationError as e:
# Invalid API key - don't retry
logger.error(f"Authentication failed: {e}")
raise NonRetryableError("Invalid API credentials") from e
except BadRequestError as e:
# Invalid request - don't retry (fix the request)
logger.error(f"Bad request: {e}")
raise NonRetryableError(f"Invalid request: {e}") from e
except RateLimitError as e:
# Rate limited - retry with backoff
logger.warning(f"Rate limited: {e}")
raise RetryableError("Rate limit exceeded") from e
except APIConnectionError as e:
# Network issue - retry
logger.warning(f"Connection error: {e}")
raise RetryableError("Connection failed") from e
except APIError as e:
# Server error - may be retryable
if e.status_code >= 500:
logger.warning(f"Server error: {e}")
raise RetryableError(f"Server error: {e.status_code}") from e
raise NonRetryableError(f"API error: {e}") from e
# Combine with retry logic {.unnumbered}
@retry(
retry=retry_if_exception_type(RetryableError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=1, max=30)
)
async def robust_llm_call(prompt: str) -> str:
return await call_llm_safely(prompt)Testing Patterns
Pytest Fixtures for API Mocking
import pytest
from unittest.mock import AsyncMock, patch
from openai.types.chat import ChatCompletion, ChatCompletionMessage, Choice
@pytest.fixture
def mock_openai_response():
"""Create a mock OpenAI chat completion response."""
return ChatCompletion(
id="mock-id",
model="gpt-4",
object="chat.completion",
created=1234567890,
choices=[
Choice(
index=0,
message=ChatCompletionMessage(
role="assistant",
content="This is a mock response"
),
finish_reason="stop"
)
],
usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
)
@pytest.fixture
def mock_openai_client(mock_openai_response):
"""Mock the OpenAI client for testing."""
with patch("openai.AsyncOpenAI") as mock:
client = AsyncMock()
client.chat.completions.create = AsyncMock(return_value=mock_openai_response)
mock.return_value = client
yield client
@pytest.mark.asyncio
async def test_chat_completion(mock_openai_client):
"""Test chat completion with mocked API."""
from myapp.llm import get_completion
result = await get_completion("Hello")
assert result == "This is a mock response"
mock_openai_client.chat.completions.create.assert_called_once()Snapshot Testing
import pytest
from syrupy.assertion import SnapshotAssertion
def test_prompt_formatting(snapshot: SnapshotAssertion):
"""Verify prompt formatting doesn't change unexpectedly."""
prompt = format_rag_prompt(
query="What is machine learning?",
documents=["ML is a subset of AI...", "Machine learning enables..."]
)
assert prompt == snapshot
# First run creates snapshot file {.unnumbered}
# Subsequent runs compare against it {.unnumbered}
# Update with: pytest --snapshot-update {.unnumbered}Testing Streaming Responses
@pytest.mark.asyncio
async def test_streaming_response():
"""Test streaming completion handling."""
async def mock_stream():
chunks = [
{"choices": [{"delta": {"content": "Hello"}}]},
{"choices": [{"delta": {"content": " world"}}]},
{"choices": [{"delta": {}}]}
]
for chunk in chunks:
yield chunk
with patch.object(client.chat.completions, "create") as mock:
mock.return_value = mock_stream()
tokens = []
async for token in stream_completion("Hi"):
tokens.append(token)
assert "".join(tokens) == "Hello world"VCR for Recorded API Interactions
import pytest
import vcr
# VCR records HTTP interactions and replays them {.unnumbered}
@vcr.use_cassette("tests/cassettes/openai_completion.yaml")
def test_real_api_call():
"""Test with recorded API response."""
# First run: makes real API call, records response
# Subsequent runs: replays recorded response
result = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Say hello"}]
)
assert "hello" in result.choices[0].message.content.lower()Docker Configuration
Dockerfile for AI Applications
# Use specific Python version {.unnumbered}
FROM python:3.11-slim
# Set working directory {.unnumbered}
WORKDIR /app
# Install system dependencies (if needed for packages like numpy) {.unnumbered}
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy dependency files first (for layer caching) {.unnumbered}
COPY requirements.txt .
# Install Python dependencies {.unnumbered}
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code {.unnumbered}
COPY . .
# Don't run as root {.unnumbered}
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
# Environment variables (defaults, overridable) {.unnumbered}
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# Run the application {.unnumbered}
CMD ["python", "-m", "app.main"]Multi-Stage Build
# Build stage {.unnumbered}
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Runtime stage {.unnumbered}
FROM python:3.11-slim
WORKDIR /app
# Copy only the installed packages {.unnumbered}
COPY --from=builder /root/.local /root/.local
ENV PATH=/root/.local/bin:$PATH
COPY . .
CMD ["python", "-m", "app.main"]Docker Compose for Development
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://postgres:postgres@db:5432/app
volumes:
- .:/app # Mount code for hot reloading
depends_on:
- db
- redis
db:
image: pgvector/pgvector:pg16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: app
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
postgres_data:VS Code Configuration
settings.json
{
"python.defaultInterpreterPath": ".venv/bin/python",
"python.analysis.typeCheckingMode": "basic",
"editor.formatOnSave": true,
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff"
},
"python.analysis.diagnosticMode": "workspace",
"python.testing.pytestEnabled": true,
"python.testing.pytestArgs": ["tests"],
"python.envFile": "${workspaceFolder}/.env"
}.cursorrules
This is a Python AI engineering project. Key patterns:
- Use async/await for all API calls
- Use Pydantic for request/response validation
- Follow existing error handling patterns in src/errors.py
- Add type hints to all function signatures