Python for AI Engineering - Code Reference

Complete code examples from Chapter 2: Python for AI Engineering.


Table of Contents

  1. Type Hints and TypedDict
  2. Pydantic Models
  3. Async/Await Patterns
  4. Context Managers
  5. HTTP Clients (httpx)
  6. LLM SDK Patterns
  7. NumPy for Embeddings
  8. Pandas Data Manipulation
  9. JSON Handling
  10. Environment Management
  11. Streaming Patterns
  12. Retry Logic
  13. Rate Limiting
  14. Logging and Observability
  15. Error Handling
  16. Testing Patterns

Type Hints and TypedDict

Basic Type Hints for API Responses

from typing import TypedDict

class ChatMessage(TypedDict):
    role: str
    content: str

class ChatChoice(TypedDict):
    message: ChatMessage
    index: int

class ChatResponse(TypedDict):
    choices: list[ChatChoice]
    model: str
    usage: dict[str, int]

class ProcessingConfig(TypedDict, total=False):
    raw: bool
    strip_whitespace: bool

def process_response(response: ChatResponse, config: ProcessingConfig) -> str:
    if config.get('raw'):
        return response['choices'][0]['message']['content']
    return clean(response)

Pydantic Models for LLM Responses

from typing import Literal, Optional
from pydantic import BaseModel

class Message(BaseModel):
    role: Literal["user", "assistant", "system"]
    content: str

class CompletionUsage(BaseModel):
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

class CompletionChoice(BaseModel):
    index: int
    message: Message
    finish_reason: Literal["stop", "length", "content_filter"]

class ChatCompletion(BaseModel):
    id: str
    model: str
    choices: list[CompletionChoice]
    usage: CompletionUsage

Pydantic Models

Request/Response Validation

from pydantic import BaseModel, Field, field_validator
from typing import Literal

class ChatMessage(BaseModel):
    role: Literal["user", "assistant", "system"]
    content: str = Field(..., min_length=1, max_length=100000)

    @field_validator('content')
    @classmethod
    def strip_whitespace(cls, v: str) -> str:
        return v.strip()

class ChatRequest(BaseModel):
    messages: list[ChatMessage]
    model: str = "gpt-4"
    temperature: float = Field(default=0.7, ge=0, le=2)
    max_tokens: int = Field(default=1000, gt=0)

    model_config = {"extra": "forbid"}  # Reject unexpected fields

# Automatic validation {.unnumbered}
try:
    request = ChatRequest(
        messages=[{"role": "user", "content": "Hello"}],
        temperature=2.5  # Invalid: > 2
    )
except ValidationError as e:
    print(e)  # temperature: Input should be less than or equal to 2

# Parse from JSON {.unnumbered}
request = ChatRequest.model_validate_json(json_string)

# Serialize to JSON {.unnumbered}
json_string = request.model_dump_json()

LLM Configuration with Validation

from pydantic import BaseModel, Field, model_validator
from pathlib import Path
import yaml

class LLMConfig(BaseModel):
    provider: Literal["openai", "anthropic", "local"]
    model: str
    api_key: str = Field(..., exclude=True)  # Don't serialize
    temperature: float = Field(default=0.7, ge=0, le=2)
    max_tokens: int = Field(default=2000, gt=0)
    timeout_seconds: float = Field(default=60, gt=0)

    @model_validator(mode='after')
    def validate_model_for_provider(self) -> 'LLMConfig':
        valid_models = {
            "openai": ["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo"],
            "anthropic": ["claude-3-opus", "claude-3-sonnet", "claude-3-haiku"],
            "local": ["llama-2-7b", "mistral-7b"]
        }
        if self.model not in valid_models.get(self.provider, []):
            raise ValueError(f"Model {self.model} not valid for {self.provider}")
        return self

    @classmethod
    def from_yaml(cls, path: Path) -> 'LLMConfig':
        with open(path) as f:
            data = yaml.safe_load(f)
        return cls(**data)

Structured Output Models

from pydantic import BaseModel, Field
from typing import Literal

class ExtractedEntity(BaseModel):
    name: str
    entity_type: Literal["person", "organization", "location", "product"]
    confidence: float = Field(ge=0, le=1)

class ExtractionResult(BaseModel):
    entities: list[ExtractedEntity]
    summary: str

# Prompt the LLM to output matching JSON {.unnumbered}
prompt = f"""Extract entities from this text. Respond with JSON matching this schema:
{ExtractionResult.model_json_schema()}

Text: {user_text}
"""

response = await llm.complete(prompt)
result = safe_parse_llm_json(response, ExtractionResult)

Async/Await Patterns

Pattern 1: Concurrent API Calls with gather

import asyncio
import httpx
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

async def fetch_all_embeddings(texts: list[str]) -> list[list[float]]:
    async with httpx.AsyncClient() as client:
        tasks = [fetch_embedding(client, text) for text in texts]
        return await asyncio.gather(*tasks)

Pattern 2: Controlled Concurrency with Semaphore

async def fetch_with_limit(texts: list[str], max_concurrent: int = 5) -> list[list[float]]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(text: str) -> list[float]:
        async with semaphore:  # At most max_concurrent run simultaneously
            return await fetch_embedding(text)

    async with httpx.AsyncClient() as client:
        return await asyncio.gather(*[fetch_one(t) for t in texts])

Pattern 3: Timeout Handling

async def fetch_with_timeout(text: str, timeout: float = 30.0) -> list[float]:
    try:
        return await asyncio.wait_for(fetch_embedding(text), timeout=timeout)
    except asyncio.TimeoutError:
        raise EmbeddingError(f"Embedding timed out after {timeout}s")

Pattern 4: First Successful Result

async def fetch_from_any_provider(text: str, providers: list[str]) -> list[float]:
    tasks = [fetch_from_provider(text, p) for p in providers]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

    return done.pop().result()

Bridging Sync and Async

# Running async code from sync context {.unnumbered}
def sync_embed(text: str) -> list[float]:
    return asyncio.run(async_embed(text))

# Running sync code from async context (for CPU-bound work) {.unnumbered}
async def async_process(data: bytes) -> dict:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, cpu_intensive_process, data)

Context Managers

Sync Context Manager for Timing

from contextlib import contextmanager, asynccontextmanager
from typing import Generator, AsyncGenerator
import time

@contextmanager
def timer(operation: str) -> Generator[None, None, None]:
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"{operation} took {elapsed:.2f}s")

# Usage {.unnumbered}
with timer("Embedding generation"):
    embeddings = embed_documents(texts)

Async Context Manager for API Client Lifecycle

@asynccontextmanager
async def managed_llm_client() -> AsyncGenerator[LLMClient, None]:
    client = LLMClient(api_key=get_api_key())
    try:
        await client.connect()
        yield client
    finally:
        await client.disconnect()

# Usage {.unnumbered}
async with managed_llm_client() as client:
    response = await client.complete(prompt)

AI-Specific Resource Management

@contextmanager
def temp_model_cache(max_size_gb: float = 10.0):
    """Temporary cache directory for model weights."""
    cache_dir = Path(tempfile.mkdtemp())
    try:
        yield cache_dir
    finally:
        shutil.rmtree(cache_dir)

@asynccontextmanager
async def rate_limited_client(requests_per_second: float):
    """HTTP client with built-in rate limiting."""
    limiter = asyncio.Semaphore(int(requests_per_second))
    client = httpx.AsyncClient()

    async def limited_request(*args, **kwargs):
        async with limiter:
            return await client.request(*args, **kwargs)

    client.limited_request = limited_request
    try:
        yield client
    finally:
        await client.aclose()

HTTP Clients (httpx)

Production Configuration

from httpx import AsyncClient, Timeout, Limits

# Sensible defaults for AI workloads {.unnumbered}
client = AsyncClient(
    timeout=Timeout(
        connect=5.0,      # Time to establish connection
        read=60.0,        # Time to read response (LLMs can be slow)
        write=10.0,       # Time to send request
        pool=5.0          # Time to acquire connection from pool
    ),
    limits=Limits(
        max_connections=100,      # Total connection pool size
        max_keepalive_connections=20,  # Persistent connections
        keepalive_expiry=30.0     # How long to keep idle connections
    ),
    http2=True  # HTTP/2 for multiplexing
)

Streaming Responses

async def stream_completion(prompt: str) -> AsyncGenerator[str, None]:
    """Stream tokens from an LLM API."""
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json={
                "model": "gpt-4",
                "messages": [{"role": "user", "content": prompt}],
                "stream": True
            }
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: ") and line != "data: [DONE]":
                    data = json.loads(line[6:])
                    if content := data["choices"][0]["delta"].get("content"):
                        yield content

LLM SDK Patterns

OpenAI SDK

from openai import OpenAI, AsyncOpenAI

# Synchronous client {.unnumbered}
client = OpenAI()  # Reads OPENAI_API_KEY from environment

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain quantum computing briefly."}
    ],
    temperature=0.7,
    max_tokens=500
)
print(response.choices[0].message.content)

# Async client {.unnumbered}
async_client = AsyncOpenAI()

async def complete(prompt: str) -> str:
    response = await async_client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# Streaming {.unnumbered}
async def stream_response(prompt: str) -> AsyncGenerator[str, None]:
    stream = await async_client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Anthropic SDK

from anthropic import Anthropic, AsyncAnthropic

client = Anthropic()  # Reads ANTHROPIC_API_KEY from environment

message = client.messages.create(
    model="claude-3-sonnet-20240229",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain quantum computing briefly."}
    ]
)
print(message.content[0].text)

# Streaming with context manager {.unnumbered}
async_client = AsyncAnthropic()

async def stream_claude(prompt: str) -> AsyncGenerator[str, None]:
    async with async_client.messages.stream(
        model="claude-3-sonnet-20240229",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            yield text

Provider-Agnostic Abstraction

from abc import ABC, abstractmethod
from typing import AsyncGenerator

class LLMProvider(ABC):
    @abstractmethod
    async def complete(self, prompt: str, **kwargs) -> str:
        pass

    @abstractmethod
    async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
        pass

class OpenAIProvider(LLMProvider):
    def __init__(self, model: str = "gpt-4"):
        self.client = AsyncOpenAI()
        self.model = model

    async def complete(self, prompt: str, **kwargs) -> str:
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            **kwargs
        )
        return response.choices[0].message.content

    async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
        stream = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            **kwargs
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

class AnthropicProvider(LLMProvider):
    def __init__(self, model: str = "claude-3-sonnet-20240229"):
        self.client = AsyncAnthropic()
        self.model = model

    async def complete(self, prompt: str, **kwargs) -> str:
        response = await self.client.messages.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=kwargs.get("max_tokens", 1024)
        )
        return response.content[0].text

    async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
        async with self.client.messages.stream(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=kwargs.get("max_tokens", 1024)
        ) as stream:
            async for text in stream.text_stream:
                yield text

NumPy for Embeddings

Basic Operations

import numpy as np

# Embeddings are just arrays of floats {.unnumbered}
embedding = np.array([0.1, -0.3, 0.5, ...])  # Typically 384-4096 dimensions

# Cosine similarity: the standard metric for embedding comparison {.unnumbered}
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# For normalized vectors (unit length), dot product = cosine similarity {.unnumbered}
def normalize(v: np.ndarray) -> np.ndarray:
    return v / np.linalg.norm(v)

# Batch normalization {.unnumbered}
embeddings = np.array([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]])
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / norms

Memory-Efficient Operations

# Use float32 instead of float64 (half the memory, sufficient precision) {.unnumbered}
embeddings = embeddings.astype(np.float32)

# Memory-mapped arrays for datasets that don't fit in RAM {.unnumbered}
embeddings = np.memmap('embeddings.npy', dtype='float32', mode='r', shape=(1000000, 1536))

# Batch processing to control memory {.unnumbered}
def process_in_batches(
    embeddings: np.ndarray,
    batch_size: int = 10000
) -> Generator[np.ndarray, None, None]:
    for i in range(0, len(embeddings), batch_size):
        yield embeddings[i:i + batch_size]

Pandas Data Manipulation

Common AI Data Tasks

import pandas as pd

# Loading evaluation datasets {.unnumbered}
eval_df = pd.read_json("evaluation_set.jsonl", lines=True)

# Filtering by criteria {.unnumbered}
hard_examples = eval_df[eval_df["difficulty"] == "hard"]

# Adding computed columns {.unnumbered}
eval_df["response_length"] = eval_df["response"].str.len()
eval_df["is_correct"] = eval_df["predicted"] == eval_df["expected"]

# Aggregating results {.unnumbered}
results = eval_df.groupby("category").agg({
    "is_correct": "mean",  # Accuracy per category
    "latency_ms": ["mean", "p50", "p95"]
}).round(3)

# Joining datasets {.unnumbered}
prompts_df = pd.read_csv("prompts.csv")
responses_df = pd.read_csv("responses.csv")
merged = prompts_df.merge(responses_df, on="prompt_id")

Preparing Fine-Tuning Data

def prepare_training_data(df: pd.DataFrame) -> pd.DataFrame:
    """Convert raw data to fine-tuning format."""

    # Filter for quality
    df = df[df["quality_score"] >= 0.8]

    # Format as conversation
    df["messages"] = df.apply(
        lambda row: [
            {"role": "system", "content": row["system_prompt"]},
            {"role": "user", "content": row["user_input"]},
            {"role": "assistant", "content": row["expected_output"]}
        ],
        axis=1
    )

    # Train/validation split
    df["split"] = np.random.choice(
        ["train", "val"],
        size=len(df),
        p=[0.9, 0.1]
    )

    return df[["messages", "split"]]

# Export for training {.unnumbered}
training_df = prepare_training_data(raw_df)
training_df[training_df["split"] == "train"]["messages"].to_json(
    "train.jsonl", orient="records", lines=True
)

JSON Handling

Parsing LLM JSON Output

import json
import re
from typing import Any

def extract_json(text: str) -> dict[str, Any]:
    """Extract JSON from LLM response that might include markdown or prose."""

    # Try direct parsing first
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        pass

    # Try extracting from markdown code block
    json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
    if json_match:
        try:
            return json.loads(json_match.group(1))
        except json.JSONDecodeError:
            pass

    # Try finding JSON object in text
    json_match = re.search(r'\{[\s\S]*\}', text)
    if json_match:
        try:
            return json.loads(json_match.group())
        except json.JSONDecodeError:
            pass

    raise ValueError(f"Could not extract valid JSON from: {text[:200]}...")

def safe_parse_llm_json(text: str, schema: type[BaseModel]) -> BaseModel:
    """Parse LLM JSON output with validation."""
    raw = extract_json(text)
    return schema.model_validate(raw)

Environment Management

Environment Variables with python-dotenv

# .env file (NEVER commit this) {.unnumbered}
# OPENAI_API_KEY=sk-... {.unnumbered}
# ANTHROPIC_API_KEY=sk-ant-... {.unnumbered}
# DATABASE_URL=postgresql://... {.unnumbered}

# Load at application start {.unnumbered}
from dotenv import load_dotenv
import os

load_dotenv()  # Loads .env into environment

openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
    raise ValueError("OPENAI_API_KEY environment variable not set")

Secrets Managers for Production

# AWS Secrets Manager {.unnumbered}
import boto3
import json

def get_secret(secret_name: str) -> dict:
    client = boto3.client('secretsmanager')
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response['SecretString'])

# Usage {.unnumbered}
secrets = get_secret("prod/ai-assistant")
openai_key = secrets["OPENAI_API_KEY"]

# Google Cloud Secret Manager {.unnumbered}
from google.cloud import secretmanager

def get_gcp_secret(project_id: str, secret_id: str, version: str = "latest") -> str:
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode("UTF-8")

pyproject.toml Configuration

[tool.poetry]
name = "ai-assistant"
version = "0.1.0"
description = "AI-powered assistant application"
authors = ["Your Name <you@example.com>"]

[tool.poetry.dependencies]
python = "^3.11"
openai = "^1.12"
anthropic = "^0.5"
pydantic = "^2.0"
httpx = "^0.26"

[tool.poetry.group.dev.dependencies]
pytest = "^8.0"
pytest-asyncio = "^0.23"
mypy = "^1.8"
ruff = "^0.2"

[tool.mypy]
python_version = "3.11"
strict = true

[tool.ruff]
target-version = "py311"
select = ["E", "F", "I", "N", "W", "UP"]

Streaming Patterns

Generator Basics

# Sync generator {.unnumbered}
def count_up(n: int) -> Generator[int, None, None]:
    for i in range(n):
        yield i

# Values are produced lazily, one at a time {.unnumbered}
for num in count_up(5):
    print(num)  # Prints 0, 1, 2, 3, 4

# Async generator {.unnumbered}
async def stream_numbers(n: int) -> AsyncGenerator[int, None]:
    for i in range(n):
        await asyncio.sleep(0.1)  # Simulate async work
        yield i

# Consume with async for {.unnumbered}
async for num in stream_numbers(5):
    print(num)

Streaming LLM Responses

async def stream_chat_response(
    messages: list[dict],
    on_token: Callable[[str], None] | None = None
) -> str:
    """Stream a chat completion, calling on_token for each chunk."""
    full_response = []

    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=messages,
        stream=True
    )

    async for chunk in stream:
        if content := chunk.choices[0].delta.content:
            full_response.append(content)
            if on_token:
                on_token(content)

    return "".join(full_response)

# Usage with print callback {.unnumbered}
response = await stream_chat_response(
    messages=[{"role": "user", "content": "Write a poem"}],
    on_token=lambda t: print(t, end="", flush=True)
)

Server-Sent Events (SSE) for Web

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def generate_stream(prompt: str) -> AsyncGenerator[str, None]:
    """Generate SSE-formatted stream."""
    async for token in stream_llm_response(prompt):
        yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        generate_stream(request.prompt),
        media_type="text/event-stream"
    )

Retry Logic

Using Tenacity Library

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log
)
import logging

logger = logging.getLogger(__name__)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)),
    before_sleep=before_sleep_log(logger, logging.WARNING)
)
async def call_llm_with_retry(prompt: str) -> str:
    """Call LLM API with automatic retry on transient failures."""
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

Custom Retry Logic

import asyncio
from typing import TypeVar, Callable, Awaitable
import random

T = TypeVar('T')

async def retry_with_backoff(
    func: Callable[[], Awaitable[T]],
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
    retryable_exceptions: tuple = (Exception,)
) -> T:
    """Execute async function with exponential backoff retry."""
    last_exception = None

    for attempt in range(max_attempts):
        try:
            return await func()
        except retryable_exceptions as e:
            last_exception = e

            if attempt == max_attempts - 1:
                raise

            # Exponential backoff: 1s, 2s, 4s, 8s...
            delay = min(base_delay * (2 ** attempt), max_delay)

            # Add jitter to prevent thundering herd
            if jitter:
                delay = delay * (0.5 + random.random())

            logger.warning(
                f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s"
            )
            await asyncio.sleep(delay)

    raise last_exception  # Should never reach here

# Usage {.unnumbered}
result = await retry_with_backoff(
    lambda: client.embeddings.create(input=text, model="text-embedding-3-small"),
    max_attempts=5,
    retryable_exceptions=(httpx.TimeoutException, RateLimitError)
)

Rate Limiting

Token Bucket Rate Limiter

import asyncio
import time

class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(self, requests_per_minute: int):
        self.requests_per_minute = requests_per_minute
        self.tokens = requests_per_minute
        self.last_update = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Wait until a request token is available."""
        async with self._lock:
            while True:
                now = time.monotonic()
                elapsed = now - self.last_update

                # Refill tokens based on elapsed time
                self.tokens = min(
                    self.requests_per_minute,
                    self.tokens + elapsed * (self.requests_per_minute / 60)
                )
                self.last_update = now

                if self.tokens >= 1:
                    self.tokens -= 1
                    return

                # Wait for a token to become available
                wait_time = (1 - self.tokens) * (60 / self.requests_per_minute)
                await asyncio.sleep(wait_time)

# Usage {.unnumbered}
rate_limiter = RateLimiter(requests_per_minute=60)

async def rate_limited_call(prompt: str) -> str:
    await rate_limiter.acquire()
    return await client.chat.completions.create(...)

Combined Rate and Concurrency Limiting

class ConcurrencyLimiter:
    """Limit concurrent in-flight requests."""

    def __init__(self, max_concurrent: int):
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def __aenter__(self):
        await self.semaphore.acquire()
        return self

    async def __aexit__(self, *args):
        self.semaphore.release()

class APIThrottler:
    def __init__(
        self,
        requests_per_minute: int = 60,
        max_concurrent: int = 10
    ):
        self.rate_limiter = RateLimiter(requests_per_minute)
        self.concurrency_limiter = ConcurrencyLimiter(max_concurrent)

    async def throttled_call(self, func: Callable[[], Awaitable[T]]) -> T:
        await self.rate_limiter.acquire()
        async with self.concurrency_limiter:
            return await func()

Logging and Observability

Structured Logging Setup

import logging
import json
from datetime import datetime
from typing import Any

class JSONFormatter(logging.Formatter):
    """Format logs as JSON for structured logging systems."""

    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }

        # Include extra fields
        if hasattr(record, "extra"):
            log_data.update(record.extra)

        return json.dumps(log_data)

def setup_logging(level: str = "INFO") -> None:
    """Configure application logging."""
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())

    logging.basicConfig(
        level=getattr(logging, level),
        handlers=[handler]
    )

# Usage with extra context {.unnumbered}
logger = logging.getLogger(__name__)

def log_llm_call(
    prompt: str,
    response: str,
    model: str,
    latency_ms: float,
    tokens_used: int
) -> None:
    logger.info(
        "LLM call completed",
        extra={
            "prompt_preview": prompt[:100],
            "response_preview": response[:100],
            "model": model,
            "latency_ms": latency_ms,
            "tokens_used": tokens_used,
            "event_type": "llm_call"
        }
    )

Request Tracing

import uuid
from contextvars import ContextVar

# Request ID propagation {.unnumbered}
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")

class TracingMiddleware:
    """Middleware to add request tracing."""

    async def __call__(self, request, call_next):
        request_id = str(uuid.uuid4())[:8]
        request_id_var.set(request_id)

        logger.info(
            "Request started",
            extra={"request_id": request_id, "path": request.url.path}
        )

        start = time.perf_counter()
        response = await call_next(request)
        elapsed = (time.perf_counter() - start) * 1000

        logger.info(
            "Request completed",
            extra={
                "request_id": request_id,
                "status_code": response.status_code,
                "latency_ms": elapsed
            }
        )

        response.headers["X-Request-ID"] = request_id
        return response

Error Handling

Comprehensive API Error Handling

from openai import (
    APIError,
    APIConnectionError,
    RateLimitError,
    AuthenticationError,
    BadRequestError
)

class LLMError(Exception):
    """Base exception for LLM operations."""
    pass

class RetryableError(LLMError):
    """Error that may succeed on retry."""
    pass

class NonRetryableError(LLMError):
    """Error that won't be fixed by retrying."""
    pass

async def call_llm_safely(prompt: str) -> str:
    """Call LLM with comprehensive error handling."""
    try:
        response = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

    except AuthenticationError as e:
        # Invalid API key - don't retry
        logger.error(f"Authentication failed: {e}")
        raise NonRetryableError("Invalid API credentials") from e

    except BadRequestError as e:
        # Invalid request - don't retry (fix the request)
        logger.error(f"Bad request: {e}")
        raise NonRetryableError(f"Invalid request: {e}") from e

    except RateLimitError as e:
        # Rate limited - retry with backoff
        logger.warning(f"Rate limited: {e}")
        raise RetryableError("Rate limit exceeded") from e

    except APIConnectionError as e:
        # Network issue - retry
        logger.warning(f"Connection error: {e}")
        raise RetryableError("Connection failed") from e

    except APIError as e:
        # Server error - may be retryable
        if e.status_code >= 500:
            logger.warning(f"Server error: {e}")
            raise RetryableError(f"Server error: {e.status_code}") from e
        raise NonRetryableError(f"API error: {e}") from e

# Combine with retry logic {.unnumbered}
@retry(
    retry=retry_if_exception_type(RetryableError),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=2, min=1, max=30)
)
async def robust_llm_call(prompt: str) -> str:
    return await call_llm_safely(prompt)

Testing Patterns

Pytest Fixtures for API Mocking

import pytest
from unittest.mock import AsyncMock, patch
from openai.types.chat import ChatCompletion, ChatCompletionMessage, Choice

@pytest.fixture
def mock_openai_response():
    """Create a mock OpenAI chat completion response."""
    return ChatCompletion(
        id="mock-id",
        model="gpt-4",
        object="chat.completion",
        created=1234567890,
        choices=[
            Choice(
                index=0,
                message=ChatCompletionMessage(
                    role="assistant",
                    content="This is a mock response"
                ),
                finish_reason="stop"
            )
        ],
        usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
    )

@pytest.fixture
def mock_openai_client(mock_openai_response):
    """Mock the OpenAI client for testing."""
    with patch("openai.AsyncOpenAI") as mock:
        client = AsyncMock()
        client.chat.completions.create = AsyncMock(return_value=mock_openai_response)
        mock.return_value = client
        yield client

@pytest.mark.asyncio
async def test_chat_completion(mock_openai_client):
    """Test chat completion with mocked API."""
    from myapp.llm import get_completion

    result = await get_completion("Hello")

    assert result == "This is a mock response"
    mock_openai_client.chat.completions.create.assert_called_once()

Snapshot Testing

import pytest
from syrupy.assertion import SnapshotAssertion

def test_prompt_formatting(snapshot: SnapshotAssertion):
    """Verify prompt formatting doesn't change unexpectedly."""
    prompt = format_rag_prompt(
        query="What is machine learning?",
        documents=["ML is a subset of AI...", "Machine learning enables..."]
    )

    assert prompt == snapshot

# First run creates snapshot file {.unnumbered}
# Subsequent runs compare against it {.unnumbered}
# Update with: pytest --snapshot-update {.unnumbered}

Testing Streaming Responses

@pytest.mark.asyncio
async def test_streaming_response():
    """Test streaming completion handling."""

    async def mock_stream():
        chunks = [
            {"choices": [{"delta": {"content": "Hello"}}]},
            {"choices": [{"delta": {"content": " world"}}]},
            {"choices": [{"delta": {}}]}
        ]
        for chunk in chunks:
            yield chunk

    with patch.object(client.chat.completions, "create") as mock:
        mock.return_value = mock_stream()

        tokens = []
        async for token in stream_completion("Hi"):
            tokens.append(token)

        assert "".join(tokens) == "Hello world"

VCR for Recorded API Interactions

import pytest
import vcr

# VCR records HTTP interactions and replays them {.unnumbered}
@vcr.use_cassette("tests/cassettes/openai_completion.yaml")
def test_real_api_call():
    """Test with recorded API response."""
    # First run: makes real API call, records response
    # Subsequent runs: replays recorded response
    result = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Say hello"}]
    )
    assert "hello" in result.choices[0].message.content.lower()

Docker Configuration

Dockerfile for AI Applications

# Use specific Python version {.unnumbered}
FROM python:3.11-slim

# Set working directory {.unnumbered}
WORKDIR /app

# Install system dependencies (if needed for packages like numpy) {.unnumbered}
RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Copy dependency files first (for layer caching) {.unnumbered}
COPY requirements.txt .

# Install Python dependencies {.unnumbered}
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code {.unnumbered}
COPY . .

# Don't run as root {.unnumbered}
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

# Environment variables (defaults, overridable) {.unnumbered}
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

# Run the application {.unnumbered}
CMD ["python", "-m", "app.main"]

Multi-Stage Build

# Build stage {.unnumbered}
FROM python:3.11-slim as builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# Runtime stage {.unnumbered}
FROM python:3.11-slim

WORKDIR /app

# Copy only the installed packages {.unnumbered}
COPY --from=builder /root/.local /root/.local
ENV PATH=/root/.local/bin:$PATH

COPY . .

CMD ["python", "-m", "app.main"]

Docker Compose for Development

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://postgres:postgres@db:5432/app
    volumes:
      - .:/app  # Mount code for hot reloading
    depends_on:
      - db
      - redis

  db:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: app
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  postgres_data:

VS Code Configuration

settings.json

{
    "python.defaultInterpreterPath": ".venv/bin/python",
    "python.analysis.typeCheckingMode": "basic",
    "editor.formatOnSave": true,
    "[python]": {
        "editor.defaultFormatter": "charliermarsh.ruff"
    },
    "python.analysis.diagnosticMode": "workspace",
    "python.testing.pytestEnabled": true,
    "python.testing.pytestArgs": ["tests"],
    "python.envFile": "${workspaceFolder}/.env"
}

.cursorrules

This is a Python AI engineering project. Key patterns:
- Use async/await for all API calls
- Use Pydantic for request/response validation
- Follow existing error handling patterns in src/errors.py
- Add type hints to all function signatures