Your First LLM Application - Code Reference
Complete application code from Chapter 4: Your First LLM Application.
This reference contains the full implementation of a Document Q&A Assistant using RAG (Retrieval-Augmented Generation).
Table of Contents
- Project Structure
- Configuration (config.py)
- Document Indexer (indexer.py)
- Vector Retriever (retriever.py)
- Answer Generator (generator.py)
- Main Application (qa_assistant.py)
- Evaluation Utilities (evaluation.py)
- Test Scripts
- Sample Documents
- Dependencies
Project Structure
document_qa/
├── qa_assistant.py # Main application entry point
├── indexer.py # Document processing and indexing
├── retriever.py # Vector search and retrieval
├── generator.py # LLM interaction for answers
├── evaluation.py # Evaluation utilities
├── config.py # Configuration and constants
├── requirements.txt # Dependencies
├── eval_cases.json # Evaluation test cases
├── sample_docs/ # Sample documents to test with
│ ├── hr_policy.md
│ └── tech_guide.md
└── qa_index/ # Where the vector index is stored
Configuration (config.py)
"""Configuration for the Document Q&A Assistant."""
import os
import logging
from dataclasses import dataclass
@dataclass
class Config:
"""Application configuration with sensible defaults."""
# API Keys (from environment)
anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY", "")
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
# Model settings
llm_model: str = "claude-sonnet-4-20250514"
embedding_model: str = "text-embedding-3-small"
embedding_dimensions: int = 1536
# Chunking settings
chunk_size: int = 500 # tokens
chunk_overlap: int = 50 # tokens
# Retrieval settings
top_k: int = 5 # number of chunks to retrieve
# Generation settings
max_tokens: int = 1024
temperature: float = 0.0 # deterministic for Q&A
# Paths
index_path: str = "./qa_index"
def validate(self) -> list[str]:
"""Check configuration and return list of errors."""
errors = []
if not self.anthropic_api_key:
errors.append("ANTHROPIC_API_KEY environment variable not set")
if not self.openai_api_key:
errors.append("OPENAI_API_KEY environment variable not set")
return errors
def setup_logging(level: str = "INFO") -> logging.Logger:
"""Configure logging for the application."""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger("qa_assistant")
@dataclass
class CostTracker:
"""Track API usage and costs."""
# Pricing (as of early 2024, adjust as needed)
CLAUDE_INPUT_PRICE = 3.00 / 1_000_000 # $3 per 1M input tokens
CLAUDE_OUTPUT_PRICE = 15.00 / 1_000_000 # $15 per 1M output tokens
EMBEDDING_PRICE = 0.02 / 1_000_000 # $0.02 per 1M tokens
total_input_tokens: int = 0
total_output_tokens: int = 0
total_embedding_tokens: int = 0
def add_generation(self, input_tokens: int, output_tokens: int):
"""Record a generation API call."""
self.total_input_tokens += input_tokens
self.total_output_tokens += output_tokens
def add_embedding(self, tokens: int):
"""Record an embedding API call."""
self.total_embedding_tokens += tokens
@property
def total_cost(self) -> float:
"""Calculate total cost in USD."""
generation_cost = (
self.total_input_tokens * self.CLAUDE_INPUT_PRICE +
self.total_output_tokens * self.CLAUDE_OUTPUT_PRICE
)
embedding_cost = self.total_embedding_tokens * self.EMBEDDING_PRICE
return generation_cost + embedding_cost
def summary(self) -> str:
"""Return a cost summary string."""
return (
f"Tokens: {self.total_input_tokens:,} input, "
f"{self.total_output_tokens:,} output, "
f"{self.total_embedding_tokens:,} embedding\n"
f"Estimated cost: ${self.total_cost:.4f}"
)
# Global instances {.unnumbered}
config = Config()
logger = setup_logging()
cost_tracker = CostTracker()Document Indexer (indexer.py)
"""Document processing and indexing."""
import os
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Iterator
import tiktoken
import openai
from config import config, logger
@dataclass
class Chunk:
"""A chunk of text with metadata."""
text: str
source_file: str
chunk_index: int
token_count: int
@property
def id(self) -> str:
"""Generate a unique ID for this chunk."""
content_hash = hashlib.md5(self.text.encode()).hexdigest()[:8]
return f"{self.source_file}::{self.chunk_index}::{content_hash}"
class Chunker:
"""Split documents into chunks for indexing."""
def __init__(
self,
chunk_size: int = config.chunk_size,
chunk_overlap: int = config.chunk_overlap
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Use the tokenizer for our embedding model
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self.tokenizer.encode(text))
def chunk_text(self, text: str, source_file: str) -> list[Chunk]:
"""
Split text into overlapping chunks.
Strategy: Split on paragraph boundaries when possible,
fall back to sentence boundaries, then word boundaries.
"""
# Normalize whitespace
text = text.strip()
if not text:
return []
# Try to split on paragraphs first
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_tokens = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
para_tokens = self.count_tokens(para)
# If single paragraph exceeds chunk size, split it further
if para_tokens > self.chunk_size:
# Flush current chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(self._create_chunk(
chunk_text, source_file, len(chunks)
))
current_chunk = []
current_tokens = 0
# Split the large paragraph
chunks.extend(self._split_large_text(
para, source_file, len(chunks)
))
continue
# Check if adding this paragraph exceeds chunk size
if current_tokens + para_tokens > self.chunk_size and current_chunk:
# Save current chunk
chunk_text = '\n\n'.join(current_chunk)
chunks.append(self._create_chunk(
chunk_text, source_file, len(chunks)
))
# Start new chunk with overlap
# Keep last paragraph(s) that fit in overlap
overlap_chunks = []
overlap_tokens = 0
for p in reversed(current_chunk):
p_tokens = self.count_tokens(p)
if overlap_tokens + p_tokens <= self.chunk_overlap:
overlap_chunks.insert(0, p)
overlap_tokens += p_tokens
else:
break
current_chunk = overlap_chunks
current_tokens = overlap_tokens
current_chunk.append(para)
current_tokens += para_tokens
# Don't forget the last chunk
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(self._create_chunk(
chunk_text, source_file, len(chunks)
))
return chunks
def _split_large_text(
self, text: str, source_file: str, start_index: int
) -> list[Chunk]:
"""Split text that exceeds chunk size using sentences."""
import re
# Split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_sentences = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.count_tokens(sentence)
if current_tokens + sentence_tokens > self.chunk_size and current_sentences:
chunk_text = ' '.join(current_sentences)
chunks.append(self._create_chunk(
chunk_text, source_file, start_index + len(chunks)
))
current_sentences = []
current_tokens = 0
current_sentences.append(sentence)
current_tokens += sentence_tokens
if current_sentences:
chunk_text = ' '.join(current_sentences)
chunks.append(self._create_chunk(
chunk_text, source_file, start_index + len(chunks)
))
return chunks
def _create_chunk(self, text: str, source_file: str, index: int) -> Chunk:
"""Create a Chunk object."""
return Chunk(
text=text,
source_file=source_file,
chunk_index=index,
token_count=self.count_tokens(text)
)
def load_documents(directory: str) -> Iterator[tuple[str, str]]:
"""
Load all supported documents from a directory.
Yields (filename, content) tuples.
"""
supported_extensions = {'.txt', '.md', '.markdown'}
directory = Path(directory)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
for file_path in directory.rglob('*'):
if file_path.suffix.lower() in supported_extensions:
try:
content = file_path.read_text(encoding='utf-8')
# Use relative path as identifier
relative_path = file_path.relative_to(directory)
yield str(relative_path), content
except Exception as e:
logger.warning(f"Could not read {file_path}: {e}")
def create_embeddings(texts: list[str]) -> list[list[float]]:
"""
Create embeddings for a list of texts.
Uses OpenAI's embedding API with batching for efficiency.
"""
client = openai.OpenAI()
# OpenAI's embedding API accepts batches
# but has a token limit per request
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = client.embeddings.create(
model=config.embedding_model,
input=batch
)
# Sort by index to maintain order
sorted_embeddings = sorted(response.data, key=lambda x: x.index)
all_embeddings.extend([e.embedding for e in sorted_embeddings])
return all_embeddings
def index_documents(directory: str) -> list[Chunk]:
"""
Load and chunk all documents from a directory.
This is the main entry point for document indexing.
"""
chunker = Chunker()
all_chunks = []
for filename, content in load_documents(directory):
chunks = chunker.chunk_text(content, filename)
all_chunks.extend(chunks)
logger.info(f" {filename}: {len(chunks)} chunks")
return all_chunksVector Retriever (retriever.py)
"""Vector search and retrieval."""
import chromadb
from chromadb.config import Settings
from config import config, logger
from indexer import Chunk, create_embeddings
class VectorStore:
"""Simple vector store using ChromaDB."""
def __init__(self, persist_directory: str = config.index_path):
"""Initialize or load existing vector store."""
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
self.collection = self.client.get_or_create_collection(
name="documents",
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
def add_chunks(self, chunks: list[Chunk]) -> None:
"""Add chunks to the vector store."""
if not chunks:
return
# Extract texts and metadata
texts = [chunk.text for chunk in chunks]
ids = [chunk.id for chunk in chunks]
metadatas = [
{
"source_file": chunk.source_file,
"chunk_index": chunk.chunk_index,
"token_count": chunk.token_count
}
for chunk in chunks
]
# Create embeddings
logger.info("Generating embeddings...")
embeddings = create_embeddings(texts)
# Add to collection
self.collection.add(
ids=ids,
embeddings=embeddings,
documents=texts,
metadatas=metadatas
)
logger.info(f"Added {len(chunks)} chunks to index")
def search(self, query: str, top_k: int = config.top_k) -> list[dict]:
"""
Search for chunks similar to the query.
Returns list of dicts with 'text', 'metadata', and 'score' keys.
"""
# Create embedding for query
query_embedding = create_embeddings([query])[0]
# Search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
# Format results
formatted = []
for i in range(len(results['ids'][0])):
formatted.append({
'text': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'score': 1 - results['distances'][0][i] # Convert distance to similarity
})
if formatted:
logger.debug(f"Found {len(formatted)} results, top score: {formatted[0]['score']:.3f}")
return formatted
def count(self) -> int:
"""Return number of chunks in the index."""
return self.collection.count()
def clear(self) -> None:
"""Clear all data from the index."""
self.client.delete_collection("documents")
self.collection = self.client.get_or_create_collection(
name="documents",
metadata={"hnsw:space": "cosine"}
)Answer Generator (generator.py)
"""LLM interaction for generating answers."""
import anthropic
import time
from typing import Optional
from dataclasses import dataclass
from config import config, logger, cost_tracker
class GenerationError(Exception):
"""Custom exception for generation failures."""
pass
@dataclass
class GenerationResult:
"""Result of a generation request with metadata."""
text: str
input_tokens: int
output_tokens: int
model: str
# The RAG prompt template {.unnumbered}
RAG_PROMPT_TEMPLATE = """You are a helpful assistant that answers questions based on the provided documentation.
## Instructions
- Answer the question using ONLY the information in the provided context
- If the context doesn't contain enough information to answer, say "I don't have enough information to answer that question based on the available documentation"
- Always cite your sources by mentioning which document the information comes from
- Be concise but complete
- If multiple documents contain relevant information, synthesize them
## Context
{context}
## Question
{question}
## Answer"""
def format_context(search_results: list[dict]) -> str:
"""Format search results into context for the prompt."""
context_parts = []
for i, result in enumerate(search_results, 1):
source = result['metadata']['source_file']
chunk_idx = result['metadata']['chunk_index']
text = result['text']
score = result['score']
context_parts.append(
f"[Source {i}: {source} (chunk {chunk_idx}, relevance: {score:.2f})]\n{text}"
)
return "\n\n---\n\n".join(context_parts)
def generate_answer(
question: str,
search_results: list[dict],
max_retries: int = 3
) -> GenerationResult:
"""
Generate an answer using retrieved context.
This is the core RAG generation function.
"""
client = anthropic.Anthropic()
logger.info(f"Generating answer for question: {question[:50]}...")
logger.debug(f"Using {len(search_results)} context chunks")
# Format context from search results
context = format_context(search_results)
# Construct the full prompt
prompt = RAG_PROMPT_TEMPLATE.format(
context=context,
question=question
)
delay = 1.0
last_error: Optional[Exception] = None
for attempt in range(max_retries):
try:
message = client.messages.create(
model=config.llm_model,
max_tokens=config.max_tokens,
temperature=config.temperature,
messages=[{"role": "user", "content": prompt}]
)
result = GenerationResult(
text=message.content[0].text,
input_tokens=message.usage.input_tokens,
output_tokens=message.usage.output_tokens,
model=message.model
)
# Track costs
cost_tracker.add_generation(
result.input_tokens,
result.output_tokens
)
logger.info(f"Generated answer: {result.input_tokens} in, {result.output_tokens} out")
return result
except (anthropic.RateLimitError, anthropic.APIConnectionError) as e:
last_error = e
logger.warning(f"API error on attempt {attempt + 1}: {e}")
time.sleep(delay)
delay *= 2
except anthropic.APIStatusError as e:
if e.status_code >= 500:
last_error = e
logger.warning(f"Server error on attempt {attempt + 1}: {e}")
time.sleep(delay)
delay *= 2
else:
raise GenerationError(f"API error: {e.message}") from e
raise GenerationError(f"Failed after {max_retries} attempts: {last_error}")
def generate_streaming(prompt: str) -> str:
"""
Stream a response from Claude, printing tokens as they arrive.
Returns the complete response when done.
"""
client = anthropic.Anthropic()
full_response = ""
with client.messages.stream(
model=config.llm_model,
max_tokens=config.max_tokens,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
full_response += text
print() # Newline at the end
return full_responseMain Application (qa_assistant.py)
"""Document Q&A Assistant - Main application."""
import argparse
import sys
from pathlib import Path
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from config import config, cost_tracker
from indexer import index_documents
from retriever import VectorStore
from generator import generate_answer, GenerationError
console = Console()
def validate_question(question: str) -> tuple[bool, str]:
"""
Validate user question before processing.
Returns (is_valid, error_message).
"""
if not question or not question.strip():
return False, "Question cannot be empty"
if len(question) > 1000:
return False, "Question is too long (max 1000 characters)"
if len(question.split()) < 2:
return False, "Please ask a complete question"
return True, ""
def cmd_index(args):
"""Index documents from a directory."""
directory = Path(args.directory)
if not directory.exists():
console.print(f"[red]Error: Directory not found: {directory}[/red]")
sys.exit(1)
console.print(f"[bold]Indexing documents from {directory}...[/bold]")
# Load and chunk documents
chunks = index_documents(str(directory))
if not chunks:
console.print("[yellow]No documents found to index[/yellow]")
sys.exit(1)
console.print(f"Created {len(chunks)} chunks from documents")
# Store in vector database
store = VectorStore()
if args.clear:
console.print("Clearing existing index...")
store.clear()
store.add_chunks(chunks)
console.print(f"[green]Index saved. Total chunks: {store.count()}[/green]")
def cmd_query(args):
"""Query the document index."""
# Validate configuration
errors = config.validate()
if errors:
console.print("[red]Configuration errors:[/red]")
for error in errors:
console.print(f" - {error}")
sys.exit(1)
# Validate question
is_valid, error_msg = validate_question(args.question)
if not is_valid:
console.print(f"[red]Invalid question: {error_msg}[/red]")
sys.exit(1)
# Load vector store
store = VectorStore()
if store.count() == 0:
console.print("[yellow]Index is empty. Run 'index' command first.[/yellow]")
sys.exit(1)
question = args.question
console.print(f"\n[bold]Question:[/bold] {question}\n")
# Retrieve relevant chunks
with console.status("Searching documents..."):
results = store.search(question, top_k=config.top_k)
if not results:
console.print("[yellow]No relevant documents found.[/yellow]")
sys.exit(1)
# Show retrieved sources if verbose
if args.verbose:
console.print("[dim]Retrieved sources:[/dim]")
for i, result in enumerate(results, 1):
source = result['metadata']['source_file']
score = result['score']
console.print(f" {i}. {source} (relevance: {score:.2f})")
console.print()
# Generate answer
with console.status("Generating answer..."):
try:
result = generate_answer(question, results)
except GenerationError as e:
console.print(f"[red]Error generating answer: {e}[/red]")
sys.exit(1)
# Display answer
console.print(Panel(
Markdown(result.text),
title="Answer",
border_style="green"
))
# Show token usage if verbose
if args.verbose:
console.print(f"\n[dim]Tokens: {result.input_tokens} input, {result.output_tokens} output[/dim]")
# Estimate cost (approximate rates as of 2024)
input_cost = result.input_tokens * 0.003 / 1000 # $3 per 1M input tokens
output_cost = result.output_tokens * 0.015 / 1000 # $15 per 1M output tokens
console.print(f"[dim]Estimated cost: ${input_cost + output_cost:.4f}[/dim]")
def cmd_interactive(args):
"""Interactive query mode."""
errors = config.validate()
if errors:
console.print("[red]Configuration errors:[/red]")
for error in errors:
console.print(f" - {error}")
sys.exit(1)
store = VectorStore()
if store.count() == 0:
console.print("[yellow]Index is empty. Run 'index' command first.[/yellow]")
sys.exit(1)
console.print("[bold]Document Q&A Assistant[/bold]")
console.print(f"Index contains {store.count()} chunks")
console.print("Type 'quit' or 'exit' to stop.\n")
while True:
try:
question = console.input("[bold cyan]Question:[/bold cyan] ").strip()
except (KeyboardInterrupt, EOFError):
console.print("\nGoodbye!")
break
if not question:
continue
if question.lower() in ('quit', 'exit', 'q'):
console.print("Goodbye!")
break
# Validate
is_valid, error_msg = validate_question(question)
if not is_valid:
console.print(f"[yellow]{error_msg}[/yellow]\n")
continue
# Retrieve
results = store.search(question, top_k=config.top_k)
if not results:
console.print("[yellow]No relevant documents found.[/yellow]\n")
continue
# Generate
try:
result = generate_answer(question, results)
except GenerationError as e:
console.print(f"[red]Error: {e}[/red]\n")
continue
# Display
console.print()
console.print(Panel(
Markdown(result.text),
title="Answer",
border_style="green"
))
console.print()
def main():
parser = argparse.ArgumentParser(
description="Document Q&A Assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Index documents:
python qa_assistant.py index ./docs
Query the index:
python qa_assistant.py query "What is the vacation policy?"
Interactive mode:
python qa_assistant.py interactive
"""
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Index command
index_parser = subparsers.add_parser("index", help="Index documents")
index_parser.add_argument("directory", help="Directory containing documents")
index_parser.add_argument("--clear", action="store_true",
help="Clear existing index before adding")
# Query command
query_parser = subparsers.add_parser("query", help="Query the index")
query_parser.add_argument("question", help="Question to ask")
query_parser.add_argument("-v", "--verbose", action="store_true",
help="Show additional details")
# Interactive command
interactive_parser = subparsers.add_parser("interactive",
help="Interactive query mode")
args = parser.parse_args()
if args.command == "index":
cmd_index(args)
elif args.command == "query":
cmd_query(args)
elif args.command == "interactive":
cmd_interactive(args)
else:
parser.print_help()
if __name__ == "__main__":
main()Evaluation Utilities (evaluation.py)
"""Basic evaluation utilities for the Q&A assistant."""
from dataclasses import dataclass
from typing import Optional
import json
from pathlib import Path
from retriever import VectorStore
from generator import generate_answer
@dataclass
class EvalCase:
"""A single evaluation case."""
question: str
expected_answer: str
expected_sources: list[str]
@dataclass
class EvalResult:
"""Result of evaluating a single case."""
question: str
generated_answer: str
expected_answer: str
retrieved_sources: list[str]
expected_sources: list[str]
answer_contains_expected: bool
correct_source_retrieved: bool
def load_eval_set(path: str) -> list[EvalCase]:
"""Load evaluation cases from a JSON file."""
with open(path) as f:
data = json.load(f)
return [
EvalCase(
question=case['question'],
expected_answer=case['expected_answer'],
expected_sources=case.get('expected_sources', [])
)
for case in data
]
def evaluate_case(case: EvalCase, store: VectorStore) -> EvalResult:
"""Evaluate a single case."""
# Retrieve
results = store.search(case.question)
retrieved_sources = [r['metadata']['source_file'] for r in results]
# Generate
gen_result = generate_answer(case.question, results)
# Check if expected answer content is in generated answer
answer_contains_expected = (
case.expected_answer.lower() in gen_result.text.lower()
)
# Check if expected source was retrieved
correct_source_retrieved = any(
expected in retrieved_sources
for expected in case.expected_sources
) if case.expected_sources else True
return EvalResult(
question=case.question,
generated_answer=gen_result.text,
expected_answer=case.expected_answer,
retrieved_sources=retrieved_sources,
expected_sources=case.expected_sources,
answer_contains_expected=answer_contains_expected,
correct_source_retrieved=correct_source_retrieved
)
def run_evaluation(eval_path: str, store: VectorStore) -> dict:
"""Run full evaluation and return metrics."""
cases = load_eval_set(eval_path)
results = []
for case in cases:
result = evaluate_case(case, store)
results.append(result)
# Compute metrics
total = len(results)
answer_correct = sum(1 for r in results if r.answer_contains_expected)
source_correct = sum(1 for r in results if r.correct_source_retrieved)
return {
'total_cases': total,
'answer_accuracy': answer_correct / total if total > 0 else 0,
'retrieval_accuracy': source_correct / total if total > 0 else 0,
'results': results
}
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python evaluation.py <eval_cases.json>")
sys.exit(1)
store = VectorStore()
if store.count() == 0:
print("Error: Index is empty. Run 'index' command first.")
sys.exit(1)
metrics = run_evaluation(sys.argv[1], store)
print(f"\nEvaluation Results:")
print(f" Total cases: {metrics['total_cases']}")
print(f" Answer accuracy: {metrics['answer_accuracy']:.1%}")
print(f" Retrieval accuracy: {metrics['retrieval_accuracy']:.1%}")Test Scripts
test_setup.py
"""Verify the development environment is correctly configured."""
from config import config
def main():
errors = config.validate()
if errors:
print("Configuration errors:")
for error in errors:
print(f" - {error}")
return False
# Test imports
try:
import anthropic
import openai
import chromadb
import tiktoken
print("All packages imported successfully")
except ImportError as e:
print(f"Import error: {e}")
return False
# Test Anthropic connection
try:
client = anthropic.Anthropic()
# Make a minimal API call
response = client.messages.create(
model=config.llm_model,
max_tokens=10,
messages=[{"role": "user", "content": "Say 'hello' and nothing else."}]
)
print(f"Anthropic API working: {response.content[0].text}")
except Exception as e:
print(f"Anthropic API error: {e}")
return False
# Test OpenAI connection
try:
client = openai.OpenAI()
response = client.embeddings.create(
model=config.embedding_model,
input="test"
)
print(f"OpenAI API working: got embedding with {len(response.data[0].embedding)} dimensions")
except Exception as e:
print(f"OpenAI API error: {e}")
return False
print("\nSetup complete! Ready to build.")
return True
if __name__ == "__main__":
main()test_retrieval.py
"""Test the retrieval pipeline."""
from pathlib import Path
from indexer import index_documents
from retriever import VectorStore
def create_sample_docs():
"""Create sample documents for testing."""
docs_dir = Path("sample_docs")
docs_dir.mkdir(exist_ok=True)
# HR Policy document
(docs_dir / "hr_policy.md").write_text("""
# HR Policies {.unnumbered}
## Vacation Policy
All full-time employees receive 20 days of paid time off (PTO) per year.
PTO accrues at a rate of 1.67 days per month.
Unused PTO can roll over to the next year, up to a maximum of 5 days.
PTO requests should be submitted at least 2 weeks in advance.
## Remote Work Policy
Employees may work remotely up to 3 days per week with manager approval.
Full remote arrangements require VP approval and must be reviewed quarterly.
## Expense Policy
Business expenses over $50 require receipt documentation.
Expenses over $500 require pre-approval from your manager.
Submit expense reports within 30 days of the expense.
""")
# Technical documentation
(docs_dir / "tech_guide.md").write_text("""
# Technical Guidelines {.unnumbered}
## Code Review Process
All code changes require at least one approval before merging.
Security-sensitive changes require review from the security team.
Reviews should be completed within 2 business days.
## Deployment Process
Deployments to production happen on Tuesdays and Thursdays.
Emergency deployments require on-call approval and a rollback plan.
All deployments must pass automated tests and staging validation.
## On-Call Rotation
Engineers participate in on-call rotation after 3 months on the team.
On-call shifts are one week long, starting Monday at 9 AM.
On-call engineers receive a $500 stipend per week.
""")
print("Sample documents created in ./sample_docs")
return docs_dir
def main():
# Create sample documents
docs_dir = create_sample_docs()
# Index documents
print("\nIndexing documents...")
chunks = index_documents(str(docs_dir))
print(f"Created {len(chunks)} chunks")
# Add to vector store
store = VectorStore()
store.clear() # Start fresh
store.add_chunks(chunks)
# Test queries
test_queries = [
"How many vacation days do I get?",
"Can I work from home?",
"How do I submit an expense report?",
"What is the deployment schedule?",
"How much is the on-call stipend?"
]
print("\n" + "="*50)
print("Testing retrieval:")
print("="*50)
for query in test_queries:
print(f"\nQuery: {query}")
results = store.search(query, top_k=2)
for i, result in enumerate(results):
print(f" Result {i+1} (score: {result['score']:.3f}):")
print(f" Source: {result['metadata']['source_file']}")
# Show first 100 chars of text
preview = result['text'][:100].replace('\n', ' ')
print(f" Preview: {preview}...")
if __name__ == "__main__":
main()Sample Documents
eval_cases.json
[
{
"question": "How many vacation days do employees get?",
"expected_answer": "20 days",
"expected_sources": ["hr_policy.md"]
},
{
"question": "What days can we deploy to production?",
"expected_answer": "Tuesdays and Thursdays",
"expected_sources": ["tech_guide.md"]
},
{
"question": "How much is the on-call stipend?",
"expected_answer": "$500",
"expected_sources": ["tech_guide.md"]
},
{
"question": "How many days can I work remotely?",
"expected_answer": "3 days",
"expected_sources": ["hr_policy.md"]
},
{
"question": "What is the expense pre-approval threshold?",
"expected_answer": "$500",
"expected_sources": ["hr_policy.md"]
}
]Dependencies
requirements.txt
anthropic>=0.18.0
openai>=1.12.0
chromadb>=0.4.22
tiktoken>=0.6.0
rich>=13.7.0
Usage Examples
Index Documents
# Index documents from a directory {.unnumbered}
python qa_assistant.py index ./docs
# Clear existing index and re-index {.unnumbered}
python qa_assistant.py index --clear ./docsQuery the Index
# Simple query {.unnumbered}
python qa_assistant.py query "What is the vacation policy?"
# Query with verbose output (shows sources and costs) {.unnumbered}
python qa_assistant.py query -v "What is the deployment schedule?"Interactive Mode
python qa_assistant.py interactiveRun Evaluation
python evaluation.py eval_cases.jsonTest Retrieval in Isolation
python test_retrieval.pyArchitecture Overview
┌──────────────────────────────────────────────────────────────────┐
│ Document Q&A Architecture │
├──────────────────────────────────────────────────────────────────┤
│ │
│ INDEXING (offline, once per document update) │
│ ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌─────────────┐ │
│ │Documents│───▶│ Chunker │───▶│Embeddings│───▶│Vector Store │ │
│ └─────────┘ └─────────┘ └──────────┘ └─────────────┘ │
│ │
│ QUERY (online, per user request) │
│ ┌───────┐ ┌──────────┐ ┌────────┐ ┌─────────────────┐ │
│ │ Query │───▶│ Embed & │───▶│Retrieve│───▶│ Generate Answer │ │
│ └───────┘ │ Search │ │Top K │ │ with Context │ │
│ └──────────┘ └────────┘ └─────────────────┘ │
│ │
│ COMPONENTS │
│ • indexer.py: Document loading, chunking, embedding creation │
│ • retriever.py: Vector store, similarity search │
│ • generator.py: Prompt construction, LLM API, error handling │
│ • config.py: Settings, logging, cost tracking │
│ │
└──────────────────────────────────────────────────────────────────┘