Appendix E: Capstone Projects

This appendix provides end-to-end projects that integrate concepts from multiple chapters. Each project is designed to be completed in 2-4 weeks and results in a portfolio-worthy artifact.


Project 1: Production RAG System for Technical Documentation

Difficulty: Intermediate Time Estimate: 2-3 weeks Integrates: Chapters 1, 2, 3, 5, 7, 8, 21

Overview

Build a complete RAG system that can answer questions about a technical documentation corpus (e.g., a framework’s docs, API documentation, or internal company docs).

Learning Objectives

  • Implement document processing and chunking pipelines
  • Deploy and optimize vector search
  • Build evaluation frameworks for retrieval and generation
  • Handle production concerns: caching, monitoring, graceful degradation

Architecture

Production RAG System Architecture

Production RAG System Architecture

Phase 1: Document Processing (Week 1)

Reference: Chapter 7 (Document Processing and Chunking)

Tasks: 1. Set up document ingestion from your chosen source 2. Implement multiple chunking strategies: - Fixed-size with overlap - Semantic (paragraph-based) - Hierarchical (section → subsection → paragraph) 3. Extract and preserve metadata (titles, headers, source URLs) 4. Build incremental update capability

Starter Code:

# document_processor.py
from dataclasses import dataclass
from pathlib import Path
import hashlib

@dataclass
class Document:
    id: str
    content: str
    metadata: dict
    chunks: list = None

@dataclass
class Chunk:
    id: str
    content: str
    document_id: str
    metadata: dict
    embedding: list = None

class DocumentProcessor:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def process_directory(self, path: Path) -> list[Document]:
        """Process all documents in a directory."""
        documents = []
        for file_path in path.rglob("*.md"):  # Adjust for your format
            doc = self.process_file(file_path)
            documents.append(doc)
        return documents

    def process_file(self, path: Path) -> Document:
        """Process a single file into a document with chunks."""
        content = path.read_text()
        doc_id = hashlib.md5(str(path).encode()).hexdigest()[:12]

        doc = Document(
            id=doc_id,
            content=content,
            metadata={
                "source": str(path),
                "filename": path.name,
            }
        )
        doc.chunks = self.chunk_document(doc)
        return doc

    def chunk_document(self, doc: Document) -> list[Chunk]:
        """Implement your chunking strategy here."""
        # TODO: Implement semantic chunking
        raise NotImplementedError

# Your task: Complete the chunking implementation
# Try multiple strategies and compare retrieval quality

Deliverable: Document processing pipeline that can ingest and chunk your documentation corpus.

Phase 2: Vector Search Setup (Week 1-2)

Reference: Chapter 7 (Vector Databases in Production), Chapter 9 (Deployment)

Tasks: 1. Choose and deploy a vector database (recommend Qdrant for learning) 2. Select an embedding model (start with intfloat/e5-large-v2) 3. Implement batch embedding pipeline 4. Add hybrid search (dense + BM25) 5. Implement reranking with a cross-encoder

Starter Code:

# retriever.py
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer, CrossEncoder

class HybridRetriever:
    def __init__(
        self,
        embedding_model: str = "intfloat/e5-large-v2",
        reranker_model: str = "cross-encoder/ms-marco-MiniLM-L6-v2",
        collection_name: str = "docs"
    ):
        self.embedder = SentenceTransformer(embedding_model)
        self.reranker = CrossEncoder(reranker_model)
        self.client = QdrantClient(":memory:")  # Use url for production
        self.collection_name = collection_name

    def create_collection(self, vector_size: int = 1024):
        """Create vector collection."""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE
            )
        )

    def index_chunks(self, chunks: list[Chunk]):
        """Index chunks with embeddings."""
        # TODO: Implement batch embedding and indexing
        raise NotImplementedError

    def search(
        self,
        query: str,
        top_k: int = 10,
        rerank_top_k: int = 5
    ) -> list[dict]:
        """Search with reranking."""
        # TODO: Implement search + reranking pipeline
        raise NotImplementedError

# Your task: Complete the retriever with hybrid search and reranking

Deliverable: Working retrieval system with hybrid search and reranking.

Phase 3: RAG Pipeline (Week 2)

Reference: Chapter 6 (Prompt Engineering), Chapter 7 (Context Assembly)

Tasks: 1. Build the RAG generation pipeline 2. Implement prompt templates with proper context formatting 3. Add citation/source tracking 4. Implement streaming responses 5. Add conversation history support

Starter Code:

# rag_pipeline.py
from anthropic import Anthropic

class RAGPipeline:
    SYSTEM_PROMPT = """You are a helpful assistant that answers questions based on
the provided documentation. Always cite your sources using [Source N] notation.
If the documentation doesn't contain the answer, say so clearly."""

    def __init__(self, retriever: HybridRetriever, llm_client: Anthropic):
        self.retriever = retriever
        self.llm = llm_client

    def query(self, question: str, conversation_history: list = None) -> dict:
        """Answer a question using RAG."""
        # Retrieve relevant chunks
        chunks = self.retriever.search(question, top_k=5)

        # Build context
        context = self._format_context(chunks)

        # Generate response
        response = self._generate(question, context, conversation_history)

        return {
            "answer": response,
            "sources": chunks,
            "context_used": context
        }

    def _format_context(self, chunks: list[dict]) -> str:
        """Format retrieved chunks as context."""
        # TODO: Implement context formatting with source labels
        raise NotImplementedError

    def _generate(
        self,
        question: str,
        context: str,
        history: list = None
    ) -> str:
        """Generate answer using LLM."""
        # TODO: Implement generation with streaming
        raise NotImplementedError

# Your task: Complete the RAG pipeline with proper prompting and streaming

Deliverable: Complete RAG pipeline with citations and conversation support.

Phase 4: Evaluation (Week 2-3)

Reference: Chapter 11 (Evaluation), Chapter 11 (LLM-as-Judge)

Tasks: 1. Create a test set of questions with ground truth answers 2. Implement retrieval metrics (Recall@K, MRR) 3. Implement generation quality metrics (LLM-as-judge) 4. Build an evaluation dashboard 5. Run ablation studies on chunking strategies

Starter Code:

# evaluation.py
from dataclasses import dataclass

@dataclass
class TestCase:
    question: str
    ground_truth_answer: str
    relevant_doc_ids: list[str]

class RAGEvaluator:
    def __init__(self, rag_pipeline: RAGPipeline, llm_judge):
        self.rag = rag_pipeline
        self.judge = llm_judge

    def evaluate_retrieval(self, test_cases: list[TestCase]) -> dict:
        """Evaluate retrieval quality."""
        recalls = []
        mrrs = []

        for case in test_cases:
            result = self.rag.retriever.search(case.question)
            retrieved_ids = [r['id'] for r in result]

            # Calculate Recall@K
            recall = len(set(retrieved_ids) & set(case.relevant_doc_ids)) / len(case.relevant_doc_ids)
            recalls.append(recall)

            # Calculate MRR
            mrr = self._calculate_mrr(retrieved_ids, case.relevant_doc_ids)
            mrrs.append(mrr)

        return {
            "recall@10": sum(recalls) / len(recalls),
            "mrr": sum(mrrs) / len(mrrs)
        }

    def evaluate_generation(self, test_cases: list[TestCase]) -> dict:
        """Evaluate generation quality using LLM-as-judge."""
        # TODO: Implement LLM-as-judge evaluation
        raise NotImplementedError

# Your task: Complete the evaluation framework and run experiments

Deliverable: Evaluation framework with metrics dashboard and experiment results.

Phase 5: Production Hardening (Week 3)

Reference: Chapter 25 (Reliability), Chapter 16 (Security)

Tasks: 1. Add caching (query → response cache) 2. Implement rate limiting 3. Add input validation and prompt injection detection 4. Set up monitoring with Prometheus metrics 5. Implement graceful degradation

Deliverable: Production-ready RAG system with monitoring and safety measures.

Stretch Goals

  • Add GraphRAG for entity relationship queries (Chapter 7)
  • Implement feedback collection and continuous improvement
  • Add multimodal support for images in documentation (Chapter 13)
  • Deploy with Docker and Kubernetes

Project 2: Agentic Coding Assistant

Difficulty: Advanced Time Estimate: 3-4 weeks Integrates: Chapters 2, 4, 5, 6, 8

Overview

Build an AI coding assistant that can understand a codebase, answer questions about it, make edits, and run tests. This project teaches agent architecture, tool use, and safety constraints.

Learning Objectives

  • Implement tool-using agents with the ReAct pattern
  • Build safe code execution environments
  • Handle multi-step planning and execution
  • Implement proper safety constraints for code agents

Architecture

Coding Assistant Agent Architecture

Coding Assistant Agent Architecture

Phase 1: Codebase Understanding (Week 1)

Reference: Chapter 7 (RAG), Chapter 8 (Tool Use)

Tasks: 1. Implement codebase indexing (files, functions, classes) 2. Build semantic search over code 3. Create tools: search_code, find_definition, get_file_contents 4. Add codebase Q&A capability

Starter Code:

# codebase_tools.py
import ast
import os
from pathlib import Path

class CodebaseIndexer:
    """Index a codebase for search and navigation."""

    def __init__(self, repo_path: Path):
        self.repo_path = repo_path
        self.files = {}
        self.symbols = {}  # function/class definitions

    def index(self):
        """Index all Python files in the repository."""
        for py_file in self.repo_path.rglob("*.py"):
            self._index_file(py_file)

    def _index_file(self, path: Path):
        """Extract symbols from a Python file."""
        content = path.read_text()
        self.files[str(path)] = content

        try:
            tree = ast.parse(content)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    self.symbols[node.name] = {
                        "type": "function",
                        "file": str(path),
                        "line": node.lineno,
                        "code": ast.get_source_segment(content, node)
                    }
                elif isinstance(node, ast.ClassDef):
                    self.symbols[node.name] = {
                        "type": "class",
                        "file": str(path),
                        "line": node.lineno,
                        "code": ast.get_source_segment(content, node)
                    }
        except SyntaxError:
            pass  # Skip files with syntax errors

class CodebaseTools:
    """Tools for the coding assistant."""

    def __init__(self, indexer: CodebaseIndexer):
        self.indexer = indexer

    def search_code(self, query: str, max_results: int = 10) -> list[dict]:
        """Search for code matching a query."""
        # TODO: Implement semantic code search
        raise NotImplementedError

    def find_definition(self, symbol_name: str) -> dict:
        """Find the definition of a function or class."""
        if symbol_name in self.indexer.symbols:
            return self.indexer.symbols[symbol_name]
        return {"error": f"Symbol '{symbol_name}' not found"}

    def get_file_contents(self, file_path: str) -> str:
        """Get the contents of a file."""
        full_path = self.indexer.repo_path / file_path
        if full_path.exists():
            return full_path.read_text()
        return f"Error: File '{file_path}' not found"

# Your task: Complete semantic code search and integrate with an LLM

Deliverable: Codebase indexer with search and navigation tools.

Phase 2: Code Editing Agent (Week 2)

Reference: Chapter 8 (Agent Architectures), Chapter 6 (Structured Outputs)

Tasks: 1. Implement the ReAct agent loop 2. Create file editing tools with proper validation 3. Add diff generation and application 4. Implement multi-file editing coordination

Starter Code:

# coding_agent.py
from enum import Enum
from dataclasses import dataclass

class ActionType(Enum):
    SEARCH = "search"
    READ_FILE = "read_file"
    EDIT_FILE = "edit_file"
    RUN_COMMAND = "run_command"
    RESPOND = "respond"

@dataclass
class AgentAction:
    type: ActionType
    parameters: dict
    reasoning: str

class CodingAgent:
    """ReAct-style coding assistant agent."""

    SYSTEM_PROMPT = """You are a coding assistant that can search, read, and edit code.

Available tools:

- search_code(query): Search for relevant code
- read_file(path): Read a file's contents
- edit_file(path, old_content, new_content): Edit a file
- run_command(command): Run a shell command (tests, linting)

For each step:
1. Think about what you need to do
2. Choose an appropriate action
3. Observe the result
4. Continue or respond to the user

Always explain your reasoning before taking actions."""

    def __init__(self, tools: CodebaseTools, llm_client):
        self.tools = tools
        self.llm = llm_client
        self.max_steps = 10

    async def run(self, user_request: str) -> str:
        """Execute a coding task."""
        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {"role": "user", "content": user_request}
        ]

        for step in range(self.max_steps):
            # Get next action from LLM
            action = await self._get_next_action(messages)

            if action.type == ActionType.RESPOND:
                return action.parameters["response"]

            # Execute action
            result = await self._execute_action(action)

            # Add to conversation
            messages.append({
                "role": "assistant",
                "content": f"Reasoning: {action.reasoning}\nAction: {action.type.value}\nParameters: {action.parameters}"
            })
            messages.append({
                "role": "user",
                "content": f"Observation: {result}"
            })

        return "Reached maximum steps without completing the task."

    async def _get_next_action(self, messages: list) -> AgentAction:
        """Get the next action from the LLM."""
        # TODO: Implement action parsing with structured outputs
        raise NotImplementedError

    async def _execute_action(self, action: AgentAction) -> str:
        """Execute an action and return the result."""
        # TODO: Implement action execution with safety checks
        raise NotImplementedError

# Your task: Complete the agent with action parsing and execution

Deliverable: Working ReAct agent that can search and edit code.

Phase 3: Safe Code Execution (Week 2-3)

Reference: Chapter 16 (Security for Agentic Systems)

Tasks: 1. Implement sandboxed code execution (Docker-based) 2. Add tool call validation with allowlists 3. Create a confirmation flow for dangerous operations 4. Implement rollback capability for edits

Starter Code:

# sandbox.py
import docker
import tempfile
from pathlib import Path

class SecureSandbox:
    """Sandboxed environment for code execution."""

    ALLOWED_COMMANDS = [
        "python -m pytest",
        "python -m pylint",
        "python -m mypy",
        "python -m black --check",
    ]

    def __init__(self, project_path: Path):
        self.project_path = project_path
        self.client = docker.from_env()

    def run_tests(self, test_path: str = "tests/") -> dict:
        """Run tests in sandbox."""
        return self._run_command(f"python -m pytest {test_path} -v")

    def run_linter(self, file_path: str) -> dict:
        """Run linter on a file."""
        return self._run_command(f"python -m pylint {file_path}")

    def _run_command(self, command: str) -> dict:
        """Run a command in the sandbox."""
        # Validate command
        if not any(command.startswith(allowed) for allowed in self.ALLOWED_COMMANDS):
            return {"error": f"Command not allowed: {command}"}

        # TODO: Implement Docker-based execution
        raise NotImplementedError

class EditValidator:
    """Validate code edits before applying."""

    DANGEROUS_PATTERNS = [
        r"os\.system",
        r"subprocess\.",
        r"eval\(",
        r"exec\(",
        r"__import__",
    ]

    def validate_edit(self, old_content: str, new_content: str) -> dict:
        """Validate that an edit is safe."""
        # TODO: Implement edit validation
        raise NotImplementedError

# Your task: Complete sandbox execution and edit validation

Deliverable: Safe execution environment with validation and rollback.

Phase 4: Integration and Polish (Week 3-4)

Tasks: 1. Build a CLI or web interface 2. Add conversation memory across sessions 3. Implement progress indicators for long operations 4. Add comprehensive logging and debugging

Deliverable: Complete, usable coding assistant.

Stretch Goals

  • Add support for multiple languages (not just Python)
  • Implement autonomous bug fixing with test verification
  • Add code review capabilities
  • Integrate with GitHub for PR creation

Project 3: LLM Serving Platform

Difficulty: Advanced Time Estimate: 2-3 weeks Integrates: Chapters 5, 17, 21, 22

Overview

Build a multi-model LLM serving platform that handles routing, caching, rate limiting, and cost optimization. This project teaches production infrastructure at scale.

Learning Objectives

  • Deploy and optimize LLM inference
  • Implement intelligent request routing
  • Build caching strategies for LLM workloads
  • Set up comprehensive monitoring and alerting

Architecture

LLM Serving Platform Architecture

LLM Serving Platform Architecture

Phase 1: Basic Serving (Week 1)

Reference: Chapter 9 (LLM Deployment)

Tasks: 1. Deploy vLLM with an open-source model 2. Create an OpenAI-compatible API wrapper 3. Implement basic request handling with async 4. Add request/response logging

Starter Code:

# server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from vllm import LLM, SamplingParams
import asyncio

app = FastAPI()

class CompletionRequest(BaseModel):
    model: str
    messages: list[dict]
    max_tokens: int = 1024
    temperature: float = 0.7

class CompletionResponse(BaseModel):
    id: str
    choices: list[dict]
    usage: dict

class LLMServer:
    def __init__(self):
        self.models = {}

    def load_model(self, model_name: str, model_path: str):
        """Load a model into memory."""
        self.models[model_name] = LLM(model=model_path)

    async def generate(self, request: CompletionRequest) -> CompletionResponse:
        """Generate a completion."""
        if request.model not in self.models:
            raise HTTPException(404, f"Model {request.model} not found")

        # TODO: Implement generation with proper formatting
        raise NotImplementedError

server = LLMServer()

@app.post("/v1/chat/completions")
async def create_completion(request: CompletionRequest):
    return await server.generate(request)

# Your task: Complete the server with proper error handling and logging

Deliverable: Basic LLM serving API with vLLM backend.

Phase 2: Caching and Routing (Week 1-2)

Reference: Chapter 9 (Caching), Chapter 26 (Cost Optimization)

Tasks: 1. Implement semantic caching with embeddings 2. Build model router based on query complexity 3. Add fallback logic for API failures 4. Implement request coalescing for identical queries

Starter Code:

# cache.py
import hashlib
import numpy as np
from redis import Redis

class SemanticCache:
    """Cache LLM responses with semantic similarity matching."""

    def __init__(self, redis_client: Redis, embedder, similarity_threshold: float = 0.95):
        self.redis = redis_client
        self.embedder = embedder
        self.threshold = similarity_threshold

    def get(self, query: str) -> str | None:
        """Check cache for similar query."""
        query_embedding = self.embedder.encode(query)

        # Get all cached embeddings
        cached_keys = self.redis.keys("cache:emb:*")

        for key in cached_keys:
            cached_embedding = np.frombuffer(self.redis.get(key), dtype=np.float32)
            similarity = np.dot(query_embedding, cached_embedding)

            if similarity >= self.threshold:
                response_key = key.decode().replace("emb:", "resp:")
                return self.redis.get(response_key).decode()

        return None

    def set(self, query: str, response: str, ttl: int = 3600):
        """Cache a response."""
        query_hash = hashlib.md5(query.encode()).hexdigest()
        embedding = self.embedder.encode(query)

        self.redis.setex(f"cache:emb:{query_hash}", ttl, embedding.tobytes())
        self.redis.setex(f"cache:resp:{query_hash}", ttl, response)

# router.py
class ModelRouter:
    """Route requests to appropriate models based on complexity."""

    MODEL_TIERS = {
        "simple": "llama-7b",      # Simple queries
        "medium": "llama-70b",     # Complex queries
        "complex": "claude-opus-4-8" # Very complex queries
    }

    def __init__(self, complexity_classifier):
        self.classifier = complexity_classifier

    def route(self, query: str) -> str:
        """Determine which model should handle this query."""
        # TODO: Implement complexity-based routing
        raise NotImplementedError

# Your task: Complete caching and routing with proper fallbacks

Deliverable: Caching layer and intelligent routing.

Phase 3: Monitoring and Observability (Week 2)

Reference: Chapter 25 (Reliability), Chapter 11 (Observability)

Tasks: 1. Set up Prometheus metrics collection 2. Create Grafana dashboards for key metrics 3. Implement alerting for latency and error rate 4. Add request tracing with OpenTelemetry

Starter Code:

# metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps

# Define metrics
REQUEST_COUNT = Counter(
    'llm_requests_total',
    'Total LLM requests',
    ['model', 'status']
)

REQUEST_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
)

TOKENS_GENERATED = Counter(
    'llm_tokens_generated_total',
    'Total tokens generated',
    ['model']
)

CACHE_HITS = Counter(
    'llm_cache_hits_total',
    'Cache hits',
    ['cache_type']
)

def track_request(model: str):
    """Decorator to track request metrics."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.time()
            status = "success"
            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                status = "error"
                raise
            finally:
                REQUEST_COUNT.labels(model=model, status=status).inc()
                REQUEST_LATENCY.labels(model=model).observe(time.time() - start)
        return wrapper
    return decorator

# Your task: Integrate metrics and create Grafana dashboards

Deliverable: Complete observability setup with dashboards and alerts.

Phase 4: Cost Optimization (Week 3)

Reference: Chapter 26 (Cost Engineering)

Tasks: 1. Implement per-tenant cost tracking 2. Add budget enforcement and alerts 3. Optimize batch processing for throughput 4. Create cost reports and projections

Deliverable: Cost management system with tracking and optimization.

Stretch Goals

  • Add GPU autoscaling based on queue depth
  • Implement A/B testing for model versions
  • Add support for streaming responses
  • Build a management UI

Project 4: ML Platform Feature Store

Difficulty: Advanced Time Estimate: 3-4 weeks Integrates: Chapters 15, 20, 21

Overview

Build a feature store component that handles feature serving, point-in-time correctness, and training-serving consistency. This project teaches data architecture for ML at scale.

Learning Objectives

  • Implement feature serving with low latency
  • Handle point-in-time correctness for training
  • Detect and prevent training-serving skew
  • Build monitoring for feature health

Architecture

Feature Store Architecture

Feature Store Architecture

Phases

  1. Feature Registry (Week 1): Implement feature definitions, versioning, and metadata
  2. Online Store (Week 1-2): Build low-latency feature serving with Redis
  3. Offline Store (Week 2): Implement point-in-time correct feature retrieval
  4. Monitoring (Week 3): Add data quality checks and skew detection
  5. Integration (Week 3-4): Connect to training and serving pipelines

See Chapter 24 for detailed implementation patterns and starter code.


Evaluation Rubric

For each project, evaluate yourself on:

Criterion Points Description
Functionality 25 Does it work as specified?
Code Quality 20 Clean, well-organized, documented code
Testing 15 Comprehensive tests, edge case handling
Production Readiness 15 Monitoring, error handling, graceful degradation
Documentation 10 README, architecture diagrams, API docs
Stretch Goals 10 Going beyond requirements
Learning Demonstrated 5 Can explain design decisions

Total: 100 points

  • 90+: Exceptional, portfolio-highlight quality
  • 80-89: Strong, demonstrates senior-level skills
  • 70-79: Good, demonstrates mid-level skills
  • 60-69: Adequate, demonstrates foundational skills
  • <60: Needs more work

Project Submission Checklist

Before considering a project complete:


Getting Help

If you get stuck:

  1. Re-read relevant chapters: The answers are often in the text
  2. Check the paper references: Appendix C has foundational papers
  3. Use the debugging guide: Appendix F covers common issues
  4. Ask in communities: r/MachineLearning, HuggingFace forums, Discord servers
  5. Iterate: Start simple, add complexity incrementally