Chapter 19: Video & Multimodal RAG
video, multimodal, temporal, scene understanding, video embeddings, cross-modal retrieval, streaming
Introduction
A manufacturing company faced a quality assurance challenge. Their assembly lines produced thousands of units daily, but defect detection relied on periodic human inspections. By the time a problem was caught, hundreds of defective units had already been produced.
They deployed a video analysis system that continuously monitored assembly operations, detecting anomalies in real-time: a misaligned component, an unusual movement pattern, or a missing step in the assembly sequence. Defect detection time dropped from hours to seconds, reducing waste by 60%.
This illustrates why video understanding matters. While images capture moments, video captures processes—the before, during, and after that reveals how things happen, not just what exists.
Why Video Understanding Matters
Temporal context reveals meaning: A person raising their hand is ambiguous in a single frame—are they waving, reaching, or signaling? Video shows the complete gesture and its purpose.
Video is everywhere: Security footage, instructional content, meetings, product demos, medical procedures—vast amounts of organizational knowledge exist only in video form.
Multimodal search unlocks archives: Organizations have terabytes of video that’s effectively unsearchable. Combining visual, audio, and text understanding makes this content accessible.
What You’ll Learn
This chapter focuses on video understanding and multimodal retrieval:
- Frame sampling strategies that balance coverage with cost
- Temporal understanding: tracking objects and recognizing actions across time
- Vision-language models (VLMs) for video description and question answering
- Multimodal embeddings: CLIP and unified embedding spaces
- Building multimodal RAG systems that search across text, images, audio, and video
- Production architectures for video processing at scale
For audio processing fundamentals and speech-to-text systems, see Chapter 18.
Prerequisites
- Understanding of transformer architecture and attention (Chapter 5)
- Familiarity with embeddings and vector search (Chapter 7)
- RAG system design (Chapter 8)
- Audio processing basics (Chapter 18)
Video Understanding
Video adds temporal dimension to vision. A single frame is an image; a video is a sequence of images with temporal relationships and often synchronized audio.
The Temporal Dimension
Understanding video requires processing that single images cannot provide:
- Object tracking: Following entities across frames as they move, appear, and disappear
- Action recognition: Understanding what’s happening over time (waving, running, assembling)
- Temporal reasoning: Understanding before/after relationships, cause and effect, sequences
- Change detection: Identifying what’s different between time points, detecting anomalies
- Event detection: Recognizing when specific events occur in long videos
Unlike static images, video has:
- Motion: Information encoded in how things change between frames
- Temporal context: Current frame’s meaning depends on what came before
- Audio alignment: Speech, music, and sound effects synchronized with visuals
- Narrative structure: Videos often have intentional story arcs or informational progression
Frame Sampling Strategies
Vision-language models can’t process thousands of frames (a 1-minute video at 30fps is 1800 frames). Intelligent sampling is essential for balancing coverage, cost, and accuracy.
import cv2
import numpy as np
from typing import List, Dict
from scenedetect import detect, ContentDetector
class VideoSampler:
"""Sample frames from video using various strategies."""
def __init__(self):
self.scene_detector = ContentDetector()
def sample(
self,
video_path: str,
strategy: str = 'uniform',
max_frames: int = 20
) -> List[Dict]:
"""
Sample frames using specified strategy.
Args:
video_path: Path to video file
strategy: 'uniform', 'scene_based', 'motion_based', or 'adaptive'
max_frames: Maximum number of frames to sample
Returns:
List of sampled frames with metadata
"""
if strategy == 'uniform':
return self.uniform_sample(video_path, max_frames)
elif strategy == 'scene_based':
return self.scene_sample(video_path, max_frames)
elif strategy == 'motion_based':
return self.motion_sample(video_path, max_frames)
elif strategy == 'adaptive':
return self.adaptive_sample(video_path, max_frames)
else:
raise ValueError(f"Unknown strategy: {strategy}")
def uniform_sample(self, video_path: str, max_frames: int) -> List[Dict]:
"""Sample frames at uniform intervals."""
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Calculate interval to get exactly max_frames
interval = max(1, total_frames // max_frames)
frames = []
for i in range(max_frames):
frame_idx = i * interval
if frame_idx >= total_frames:
break
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
frames.append({
'frame': frame,
'frame_idx': frame_idx,
'timestamp': frame_idx / fps,
'strategy': 'uniform'
})
cap.release()
return frames
def scene_sample(self, video_path: str, max_frames: int) -> List[Dict]:
"""
Sample one frame per scene change.
Scene detection identifies significant visual changes (cuts, transitions).
This is ideal for videos with distinct scenes (movies, presentations).
"""
# Detect scenes using PySceneDetect
scene_list = detect(video_path, ContentDetector())
# If more scenes than max_frames, sample uniformly from scenes
if len(scene_list) > max_frames:
scene_interval = len(scene_list) // max_frames
selected_scenes = scene_list[::scene_interval][:max_frames]
else:
selected_scenes = scene_list
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frames = []
for idx, scene in enumerate(selected_scenes):
# Sample from middle of scene for representative frame
start_frame = int(scene[0].get_frames())
end_frame = int(scene[1].get_frames())
mid_frame = (start_frame + end_frame) // 2
cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame)
ret, frame = cap.read()
if ret:
frames.append({
'frame': frame,
'frame_idx': mid_frame,
'timestamp': mid_frame / fps,
'scene_idx': idx,
'strategy': 'scene_based'
})
cap.release()
return frames
def motion_sample(self, video_path: str, max_frames: int) -> List[Dict]:
"""
Sample more frames during high-motion periods.
Useful for action videos, sports, or surveillance where interesting
events correlate with motion.
"""
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Compute motion scores for segments
segment_size = 30 # ~1 second at 30fps
motion_scores = []
prev_frame = None
for i in range(0, total_frames, segment_size):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret and prev_frame is not None:
# Calculate frame difference as motion proxy
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
diff = cv2.absdiff(gray, prev_gray)
motion_score = np.mean(diff)
motion_scores.append((i, motion_score))
prev_frame = frame
# Sort segments by motion score
motion_scores.sort(key=lambda x: x[1], reverse=True)
# Sample from high-motion segments
selected_indices = [idx for idx, _ in motion_scores[:max_frames]]
selected_indices.sort() # Maintain temporal order
frames = []
for frame_idx in selected_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
frames.append({
'frame': frame,
'frame_idx': frame_idx,
'timestamp': frame_idx / fps,
'strategy': 'motion_based'
})
cap.release()
return frames
def adaptive_sample(self, video_path: str, max_frames: int) -> List[Dict]:
"""
Adaptive sampling: uniform baseline + scene changes + high-motion.
This hybrid approach works well for unknown video types.
"""
# Use 60% of budget for uniform sampling
uniform_count = int(max_frames * 0.6)
uniform_frames = self.uniform_sample(video_path, uniform_count)
# Use 40% for scene changes
scene_count = max_frames - uniform_count
scene_frames = self.scene_sample(video_path, scene_count)
# Combine and deduplicate by frame index
all_frames = uniform_frames + scene_frames
seen_indices = set()
unique_frames = []
for frame_data in sorted(all_frames, key=lambda x: x['frame_idx']):
if frame_data['frame_idx'] not in seen_indices:
frame_data['strategy'] = 'adaptive'
unique_frames.append(frame_data)
seen_indices.add(frame_data['frame_idx'])
return unique_frames[:max_frames]What people do: Sample every Nth frame from all videos (e.g., 1 frame every 10 seconds), regardless of whether it’s a static presentation or fast-paced action footage.
Why it fails: Fixed sampling wastes resources on static content (10 identical frames from a slide that didn’t change) while missing critical moments in dynamic content (the 2-second action sequence between sample points).
Fix: Match sampling strategy to video type. Use scene detection for presentations and edited content. Use motion-based sampling for surveillance or action videos. For unknown content, use adaptive sampling that combines uniform baseline with scene-change detection.
Choosing a Sampling Strategy
| Video Type | Recommended Strategy | Rationale |
|---|---|---|
| Surveillance footage | Motion-based | Most frames are static; focus on activity |
| Lectures/Presentations | Scene-based | Each slide is a scene; sample once per slide |
| Sports highlights | High framerate uniform | Continuous action; need temporal resolution |
| Movies/TV | Scene-based | Natural scene structure; each scene tells story |
| Product demos | Uniform or adaptive | Steady progression through features |
| Tutorials | Scene-based | Each step is a distinct visual scene |
| Nature documentaries | Adaptive | Mix of static scenes and action |
Video Analysis with VLMs
Modern vision-language models can analyze sequences of frames to understand video content:
import base64
from typing import List, Dict, Optional
from io import BytesIO
from PIL import Image
class VideoAnalyzer:
"""Analyze video content using vision-language models."""
def __init__(self, vlm_client):
self.vlm = vlm_client
self.sampler = VideoSampler()
async def analyze(
self,
video_path: str,
analysis_type: str = 'summary',
max_frames: int = 20
) -> Dict:
"""
Analyze video with appropriate sampling and prompting.
Args:
video_path: Path to video file
analysis_type: 'summary', 'action_recognition', 'scene_description', or 'qa'
max_frames: Maximum frames to sample
Returns:
Analysis results with metadata
"""
# Choose sampling strategy based on analysis type
strategy = {
'summary': 'adaptive',
'action_recognition': 'motion_based',
'scene_description': 'scene_based',
'qa': 'uniform'
}.get(analysis_type, 'adaptive')
frames = self.sampler.sample(video_path, strategy, max_frames)
# Build multi-image prompt
content = [
{
"type": "text",
"text": self._get_prompt(analysis_type, len(frames))
}
]
for frame_data in frames:
# Add timestamp annotation
content.append({
"type": "text",
"text": f"\n[Frame at {frame_data['timestamp']:.1f}s]"
})
# Encode frame as base64 image
content.append(self._encode_frame(frame_data['frame']))
response = await self.vlm.analyze(content)
return {
'analysis': response,
'frame_count': len(frames),
'strategy': strategy,
'video_duration': frames[-1]['timestamp'] if frames else 0
}
def _get_prompt(self, analysis_type: str, frame_count: int) -> str:
"""Generate appropriate prompt for analysis type."""
prompts = {
'summary': f"""You are viewing {frame_count} frames sampled from a video.
Provide a comprehensive summary including:
1. Overall content and topic
2. Key events or changes observed across frames
3. Main subjects or objects and how they change
4. Setting and context
5. Any text visible in the video
6. Temporal progression (what happens over time)""",
'action_recognition': f"""Analyze these {frame_count} frames for actions and events.
For each significant action observed:
- Describe the action clearly
- Note approximate timestamp from frame labels
- Identify actors/subjects involved
- Describe any objects or tools used
- Indicate the start and end of each action""",
'scene_description': f"""Describe each scene in this video based on {frame_count} frames.
For each distinct scene:
- Describe the visual setting and environment
- Note key visual elements and composition
- Identify any text, graphics, or UI elements
- Describe the mood, tone, or atmosphere
- Note transitions between scenes""",
'qa': f"""You are viewing {frame_count} frames from a video.
Answer questions about the video content based on what you observe.
Reference specific frames by timestamp when relevant.
If you cannot answer based on the frames shown, say so."""
}
return prompts.get(analysis_type, prompts['summary'])
def _encode_frame(self, frame: np.ndarray) -> Dict:
"""Encode frame as base64 for VLM input."""
# Convert BGR (OpenCV) to RGB (PIL)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# Resize if too large (save on token cost)
max_dim = 1024
if max(pil_image.size) > max_dim:
ratio = max_dim / max(pil_image.size)
new_size = tuple(int(dim * ratio) for dim in pil_image.size)
pil_image = pil_image.resize(new_size, Image.LANCZOS)
# Encode as JPEG base64
buffer = BytesIO()
pil_image.save(buffer, format='JPEG', quality=85)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return {
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
}What people do: Process videos purely as sequences of images, discarding the audio entirely, then wonder why the analysis misses obvious context.
Why it fails: In a product review video, the reviewer might show a product while saying “I was disappointed”—the visual shows the product, but the sentiment is in the audio. Meeting recordings have content primarily in speech, not visuals.
Fix: Always extract and transcribe the audio track. Fuse visual and audio analysis for comprehensive understanding. For videos with significant speech, the transcript often provides more valuable search content than the visual frames.
Combining Video and Audio
Most videos have audio that provides crucial context. Speech, music, and sound effects carry information that visuals alone miss. Combining modalities creates richer understanding:
class MultimodalVideoProcessor:
"""Process video with both visual and audio analysis."""
def __init__(self):
self.video_analyzer = VideoAnalyzer(vlm_client)
self.audio_transcriber = TranscriptionService()
self.diarizer = DiarizedTranscription(hf_token)
async def process(self, video_path: str) -> Dict:
"""
Process video and audio together for comprehensive understanding.
Returns:
Combined analysis with visual description, transcript, and fusion
"""
# Extract audio track from video
audio_path = self._extract_audio(video_path)
# Process visual and audio in parallel
video_task = asyncio.create_task(
self.video_analyzer.analyze(video_path, 'summary')
)
audio_task = asyncio.create_task(
self.diarizer.process(audio_path)
)
video_result, audio_result = await asyncio.gather(video_task, audio_task)
# Fuse analyses with LLM
fused = await self._fuse_analyses(video_result, audio_result)
return {
'video_analysis': video_result,
'transcript': audio_result,
'fused_summary': fused,
'video_path': video_path
}
def _extract_audio(self, video_path: str) -> str:
"""Extract audio track from video file."""
import subprocess
audio_path = video_path.replace('.mp4', '_audio.mp3')
# Use ffmpeg to extract audio
subprocess.run([
'ffmpeg', '-i', video_path,
'-vn', # No video
'-acodec', 'mp3',
'-y', # Overwrite
audio_path
], check=True, capture_output=True)
return audio_path
async def _fuse_analyses(
self,
video: Dict,
audio: Dict
) -> str:
"""Combine visual and audio understanding into unified summary."""
# Format transcript with speaker labels and timestamps
transcript_text = self._format_transcript(audio['turns'])
prompt = f"""Combine these analyses of a video into a unified summary:
VISUAL ANALYSIS:
{video['analysis']}
AUDIO TRANSCRIPT:
{transcript_text}
Provide a coherent summary that:
1. Integrates what was seen and heard into a single narrative
2. Notes when visual and audio provide complementary information
3. Identifies speakers and associates them with visual subjects when possible
4. Creates a timeline of key events combining both modalities
5. Highlights any discrepancies between visual and audio content
Be specific about timestamps and reference both visual frames and audio timing."""
response = await llm.generate(prompt, max_tokens=1000)
return response
def _format_transcript(self, turns: List[Dict]) -> str:
"""Format speaker turns for readable transcript."""
lines = []
for turn in turns:
timestamp = f"[{turn['start']:.1f}s - {turn['end']:.1f}s]"
speaker = turn['speaker']
text = turn['text']
lines.append(f"{timestamp} {speaker}: {text}")
return "\n".join(lines)Use Cases for Multimodal Video Analysis
- Educational Content: Match lecture slides (visual) with professor’s explanation (audio) to create searchable, navigable content
- Product Reviews: Analyze what reviewer shows (visual) vs. what they say (audio) to detect enthusiasm vs. criticism
- Surveillance: Detect activities (visual) and speech (audio) for comprehensive security monitoring
- Meeting Analysis: Understand presentations (slides), discussion (speech), and reactions (body language)
- Content Moderation: Detect prohibited visual content and spoken content together for comprehensive moderation
Complete code: See Video Processing for production-ready video analysis with frame sampling, quality optimization, and batch processing.
Multimodal Embeddings and Retrieval
Standard RAG retrieves text using text queries. But what if you want to search across modalities?
Beyond Text Search: Cross-Modal Retrieval
Cross-modal retrieval enables queries like:
- “Find product demo videos where someone mentions battery life” (text → video)
- “Find podcast episodes discussing charts about revenue growth” (text → audio + visual)
- “Find documents containing diagrams similar to this sketch” (image → documents)
- “Locate meetings where team discussed the Q3 roadmap slide” (text → video + slides)
This requires embeddings that align different modalities in a shared space.
CLIP Embeddings: Theory and Practice
CLIP (Contrastive Language-Image Pre-training) creates a shared embedding space where images and text that are semantically related map to nearby points. This enables text-to-image and image-to-text retrieval.
How CLIP Aligns Modalities
CLIP was trained on 400 million image-text pairs from the internet using contrastive learning:
- For a batch of N image-text pairs, encode all N images and all N texts
- We have N correct pairs (image[i] matches text[i])
- We have N*(N-1) incorrect pairs (all other combinations)
- Train to maximize similarity for correct pairs, minimize for incorrect pairs
Through billions of such comparisons, CLIP learns:
- Semantic understanding: “dog” and “canine” are similar
- Visual concepts: “sunset” embeds near orange/red images
- Compositionality: “red car” vs. “blue car” differ in predictable ways
- Abstraction: “chaotic scene” has meaningful visual correlates
Using CLIP for Cross-Modal Search
import torch
import numpy as np
from transformers import CLIPProcessor, CLIPModel
from typing import List, Tuple
class CLIPSearchEngine:
"""Cross-modal search using CLIP embeddings."""
def __init__(self, model_name: str = "openai/clip-vit-large-patch14"):
self.model = CLIPModel.from_pretrained(model_name)
self.processor = CLIPProcessor.from_pretrained(model_name)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
self.model.eval() # Inference mode
def embed_images(self, images: List) -> np.ndarray:
"""
Embed multiple images into CLIP space.
Args:
images: List of PIL Images or file paths
Returns:
Array of shape (num_images, embedding_dim)
"""
inputs = self.processor(images=images, return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
features = self.model.get_image_features(**inputs)
# Normalize for cosine similarity
features = features / features.norm(dim=-1, keepdim=True)
return features.cpu().numpy()
def embed_text(self, texts: List[str]) -> np.ndarray:
"""
Embed text queries into CLIP space.
Args:
texts: List of text queries
Returns:
Array of shape (num_texts, embedding_dim)
"""
inputs = self.processor(text=texts, return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
features = self.model.get_text_features(**inputs)
features = features / features.norm(dim=-1, keepdim=True)
return features.cpu().numpy()
def search_images_with_text(
self,
query: str,
image_embeddings: np.ndarray,
top_k: int = 10
) -> List[Tuple[int, float]]:
"""
Search image collection using text query.
Args:
query: Text description to search for
image_embeddings: Pre-computed image embeddings
top_k: Number of results to return
Returns:
List of (index, similarity_score) tuples
"""
# Embed query text
query_embedding = self.embed_text([query])[0]
# Compute similarities (cosine similarity, since normalized)
similarities = image_embeddings @ query_embedding
# Get top-k matches
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = [(int(idx), float(similarities[idx])) for idx in top_indices]
return results
def search_text_with_image(
self,
query_image,
text_embeddings: np.ndarray,
top_k: int = 10
) -> List[Tuple[int, float]]:
"""Search text collection using image query."""
query_embedding = self.embed_images([query_image])[0]
similarities = text_embeddings @ query_embedding
top_indices = np.argsort(similarities)[-top_k:][::-1]
return [(int(idx), float(similarities[idx])) for idx in top_indices]Practical Considerations for CLIP
1. Prompt Engineering for Better Retrieval
CLIP was trained on web captions, which tend to follow patterns. Better queries match training distribution:
# Less effective
query = "cat"
# More effective (matches caption style)
query = "a photo of a cat"
query = "a photograph showing a cat"
# Even more specific
query = "a photograph of a gray tabby cat sitting on a windowsill"2. The Modality Gap
Despite alignment, CLIP’s image and text embeddings cluster in different regions of space. You can tell whether an embedding came from an image or text. This affects retrieval:
def adjust_for_modality_gap(
text_embedding: np.ndarray,
image_embeddings: np.ndarray,
image_centroid: np.ndarray,
text_centroid: np.ndarray,
) -> np.ndarray:
"""
Reduce the modality gap before ranking image results for a text query.
A common mistake is to "calibrate" by dividing similarities by a
temperature. That is a *monotonic* transform — it rescales the scores
but leaves their order unchanged, so it does nothing for retrieval rank.
The gap is essentially a translation between the image and text clusters,
so the effective fix is to mean-center each modality (subtract that
modality's centroid, computed over your corpus) and re-normalize before
comparing.
"""
img = image_embeddings - image_centroid
txt = text_embedding - text_centroid
img /= np.linalg.norm(img, axis=1, keepdims=True)
txt /= np.linalg.norm(txt)
return img @ txt3. Domain Adaptation
CLIP was trained on natural web images. Specialized domains (medical imaging, satellite imagery, technical diagrams) may embed poorly without fine-tuning:
# Fine-tune CLIP on domain-specific data
from transformers import CLIPVisionModel, CLIPTextModel
class DomainCLIP(CLIPSearchEngine):
"""CLIP fine-tuned for specific domain."""
def fine_tune(self, image_text_pairs: List[Tuple], epochs: int = 5):
"""
Fine-tune CLIP on domain-specific image-text pairs.
This adapts the model to domain vocabulary and visual patterns.
"""
# Implement contrastive fine-tuning
# (Code omitted for brevity - see CLIP training tutorials)
passMultimodal RAG: Documents with Images
Documents containing both text and images need multimodal indexing to support queries that reference either modality:
from typing import Dict, List, Optional
import hashlib
class MultimodalDocumentRAG:
"""RAG system for documents containing text, images, tables, and charts."""
def __init__(self):
self.text_embedder = TextEmbeddingModel() # e.g., text-embedding-3-large
self.clip_embedder = CLIPSearchEngine()
self.vlm = VisionLanguageModel()
self.vector_store = VectorDatabase()
def index_document(self, doc_path: str, doc_id: str):
"""
Index all modalities of a document.
Strategy:
- Text chunks → text embeddings (for semantic text search)
- Images → visual embeddings (for image similarity search)
- Images → VLM descriptions → text embeddings (for text-based image search)
"""
pages = self._extract_pages(doc_path)
for page_num, page in enumerate(pages):
# Index text chunks
text_chunks = self._chunk_text(page['text'])
for chunk_idx, chunk in enumerate(text_chunks):
embedding = self.text_embedder.encode(chunk['text'])
self.vector_store.upsert(
id=f"{doc_id}_p{page_num}_text_{chunk_idx}",
vector=embedding,
metadata={
'doc_id': doc_id,
'page': page_num,
'type': 'text',
'content': chunk['text'],
'chunk_index': chunk_idx
}
)
# Index images with dual embeddings
for img_idx, image in enumerate(page['images']):
# Visual embedding for image-similarity search
visual_embedding = self.clip_embedder.embed_images([image['data']])[0]
# Generate description with VLM for text-based search
description = self.vlm.describe_image(
image['data'],
context=page['text'][:500] # Include surrounding text
)
semantic_embedding = self.text_embedder.encode(description)
image_id = f"{doc_id}_p{page_num}_img_{img_idx}"
# Store visual embedding (for "find similar images" queries)
self.vector_store.upsert(
id=f"{image_id}_visual",
vector=visual_embedding,
metadata={
'doc_id': doc_id,
'page': page_num,
'type': 'image_visual',
'description': description,
'image_ref': image['path']
}
)
# Store semantic embedding (for "find images about X" queries)
self.vector_store.upsert(
id=f"{image_id}_semantic",
vector=semantic_embedding,
metadata={
'doc_id': doc_id,
'page': page_num,
'type': 'image_semantic',
'description': description,
'image_ref': image['path']
}
)
def query(
self,
question: str,
query_image: Optional = None,
top_k: int = 5
) -> Dict:
"""
Query across all modalities.
Supports:
- Text query → finds text and image descriptions
- Image query → finds similar images
- Combined text+image query
"""
results = []
# Text-based search
if question:
text_query_embedding = self.text_embedder.encode(question)
clip_text_embedding = self.clip_embedder.embed_text([question])[0]
# Search text chunks
text_results = self.vector_store.search(
vector=text_query_embedding,
filter={'type': 'text'},
top_k=top_k
)
results.extend(text_results)
# Search image descriptions (semantic)
semantic_results = self.vector_store.search(
vector=text_query_embedding,
filter={'type': 'image_semantic'},
top_k=top_k
)
results.extend(semantic_results)
# Search images by visual similarity to text query
visual_results = self.vector_store.search(
vector=clip_text_embedding,
filter={'type': 'image_visual'},
top_k=top_k
)
results.extend(visual_results)
# Image-based search
if query_image is not None:
image_embedding = self.clip_embedder.embed_images([query_image])[0]
image_results = self.vector_store.search(
vector=image_embedding,
filter={'type': 'image_visual'},
top_k=top_k
)
results.extend(image_results)
# Deduplicate and rank
unique_results = self._deduplicate_results(results, top_k)
# Build context for generation
context = self._build_multimodal_context(unique_results)
# Generate answer with VLM (can reference images)
answer = self._generate_answer(question, context, unique_results)
return {
'answer': answer,
'sources': unique_results,
'source_count': len(unique_results)
}
def _deduplicate_results(
self,
results: List[Dict],
top_k: int
) -> List[Dict]:
"""
Deduplicate results by document+page+content.
If both visual and semantic embeddings of same image appear,
keep the higher-scoring one.
"""
seen = {}
for result in results:
# Create key from doc, page, and type
key = (
result['metadata']['doc_id'],
result['metadata']['page'],
result['metadata'].get('image_ref', result['metadata'].get('content', ''))
)
if key not in seen or result['score'] > seen[key]['score']:
seen[key] = result
# Sort by score and return top_k
unique = sorted(seen.values(), key=lambda x: x['score'], reverse=True)
return unique[:top_k]
def _build_multimodal_context(self, results: List[Dict]) -> str:
"""Format retrieved content as context."""
context_parts = []
for i, result in enumerate(results, 1):
meta = result['metadata']
if meta['type'] == 'text':
context_parts.append(
f"[{i}] Text from page {meta['page']}: {meta['content']}"
)
elif 'image' in meta['type']:
context_parts.append(
f"[{i}] Image from page {meta['page']}: {meta['description']}"
)
return "\n\n".join(context_parts)
def _generate_answer(
self,
question: str,
text_context: str,
results: List[Dict]
) -> str:
"""Generate answer using VLM with both text and image context."""
# Collect images from results
images = [
r['metadata']['image_ref']
for r in results
if 'image' in r['metadata']['type']
]
# Build prompt with text context and images
prompt = f"""Answer this question using the provided context and images:
Question: {question}
Text Context:
{text_context}
The images referenced in the context are provided below.
"""
# Call VLM with text prompt + images
answer = self.vlm.generate(
prompt=prompt,
images=images[:5] # Limit images to avoid token limits
)
return answerWhat people do: Generate text descriptions of images with a VLM, embed those descriptions with a text model, and call it “multimodal search.”
Why it fails: Text descriptions lose visual information. “A bar chart showing quarterly revenue” doesn’t capture the specific colors, layout, or that Q3 has a red declining bar. When a user searches “charts with downward trends,” the text embedding might miss visually obvious patterns.
Fix: Use dual embeddings—CLIP visual embeddings for visual similarity (“find charts that look like this”) AND text embeddings of descriptions for semantic search (“find charts about revenue”). Different query types need different embedding strategies.
Why Dual Embeddings for Images?
Indexing images with both CLIP (visual) and text (semantic) embeddings serves different query types:
- Visual query: “Find images similar to this chart” → uses CLIP visual embedding
- Semantic query: “Find images showing revenue growth” → uses text embedding of VLM-generated description
This dual approach maximizes recall across different query modalities.
Complete code: See Multimodal Document RAG and Multimodal Embeddings and Indexing for production implementations with dual embeddings and chunking strategies.
Late Fusion: When Separate Indexes Are Better
Sometimes searching modalities separately and combining results works better than a single unified index:
class LateFusionMultimodalSearch:
"""Search modalities separately, combine with learned weights."""
def __init__(self, fusion_weights: Optional[Dict] = None):
self.text_index = TextVectorIndex()
self.image_index = CLIPVectorIndex()
self.audio_index = AudioEmbeddingIndex()
# Default weights (can be learned from data)
self.weights = fusion_weights or {
'text': 0.5,
'image': 0.3,
'audio': 0.2
}
def search(
self,
query: str,
modalities: List[str] = ['text', 'image', 'audio'],
top_k: int = 10
) -> List[Dict]:
"""
Search each modality and combine results.
Args:
query: Text query
modalities: Which modalities to search
top_k: Number of final results
Returns:
Combined ranked results
"""
all_results = []
# Search each modality
if 'text' in modalities:
text_results = self.text_index.search(query, top_k=top_k * 2)
text_results = self._normalize_scores(text_results)
all_results.extend([
{**r, 'modality': 'text', 'weight': self.weights['text']}
for r in text_results
])
if 'image' in modalities:
image_results = self.image_index.search(query, top_k=top_k * 2)
image_results = self._normalize_scores(image_results)
all_results.extend([
{**r, 'modality': 'image', 'weight': self.weights['image']}
for r in image_results
])
if 'audio' in modalities:
audio_results = self.audio_index.search(query, top_k=top_k * 2)
audio_results = self._normalize_scores(audio_results)
all_results.extend([
{**r, 'modality': 'audio', 'weight': self.weights['audio']}
for r in audio_results
])
# Combine scores by document
doc_scores = {}
doc_results = {}
for result in all_results:
doc_id = result['doc_id']
weighted_score = result['score'] * result['weight']
if doc_id not in doc_scores:
doc_scores[doc_id] = 0
doc_results[doc_id] = []
doc_scores[doc_id] += weighted_score
doc_results[doc_id].append(result)
# Rank documents by combined score
ranked_docs = sorted(
doc_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
# Return results with all matching modalities
final_results = []
for doc_id, score in ranked_docs:
final_results.append({
'doc_id': doc_id,
'combined_score': score,
'matches': doc_results[doc_id]
})
return final_results
def _normalize_scores(self, results: List[Dict]) -> List[Dict]:
"""Normalize scores to [0, 1] range."""
if not results:
return results
scores = [r['score'] for r in results]
min_score = min(scores)
max_score = max(scores)
score_range = max_score - min_score
if score_range == 0:
return results
normalized = []
for r in results:
normalized.append({
**r,
'score': (r['score'] - min_score) / score_range
})
return normalized
def learn_fusion_weights(
self,
queries: List[str],
relevance_labels: List[Dict]
) -> Dict:
"""
Learn optimal fusion weights from labeled data.
Args:
queries: List of query strings
relevance_labels: List of {doc_id: relevance_score} dicts
Returns:
Optimal weight dictionary
"""
best_weights = self.weights
best_ndcg = 0
# Grid search over weight combinations
for text_weight in np.arange(0.2, 0.8, 0.1):
for image_weight in np.arange(0.1, 0.6, 0.1):
audio_weight = 1.0 - text_weight - image_weight
if audio_weight < 0 or audio_weight > 1:
continue
self.weights = {
'text': text_weight,
'image': image_weight,
'audio': audio_weight
}
# Evaluate on labeled data
ndcg = self._evaluate_ndcg(queries, relevance_labels)
if ndcg > best_ndcg:
best_ndcg = ndcg
best_weights = self.weights.copy()
self.weights = best_weights
return best_weights
def _evaluate_ndcg(
self,
queries: List[str],
relevance_labels: List[Dict]
) -> float:
"""Compute average NDCG across queries."""
ndcgs = []
for query, labels in zip(queries, relevance_labels):
results = self.search(query, top_k=10)
ndcg = self._compute_ndcg(results, labels)
ndcgs.append(ndcg)
return np.mean(ndcgs)
def _compute_ndcg(self, results: List[Dict], labels: Dict) -> float:
"""Compute NDCG for single query."""
# Implementation of NDCG calculation
# (omitted for brevity - standard IR metric)
passWhen to Use Late Fusion
Late fusion is beneficial when:
- Different retrieval characteristics: Text search uses semantic similarity, image search uses visual similarity—they measure different things
- Interpretability needed: You want to see which modality contributed to each result
- Variable query patterns: Some queries are more visual (“find the org chart”), others more textual (“find discussion of hiring plans”)
- You have labeled data: Fusion weights can be optimized for your specific use case
- Modality-specific tuning: You want to use different models/parameters for each modality
Early vs. Late Fusion Comparison
| Aspect | Early Fusion | Late Fusion |
|---|---|---|
| Architecture | Single unified embedding space | Separate indexes per modality |
| Cross-modal reasoning | Built-in (shared space) | Requires explicit design |
| Interpretability | Black box | Clear modality contributions |
| Optimization | End-to-end | Per-modality + fusion weights |
| Flexibility | Limited | High (different models per modality) |
| Complexity | Lower | Higher (multiple systems) |
Complete code: See Late Fusion Search for a complete implementation with learnable fusion weights and confidence scoring.
Production Architecture Patterns
Cost Management for Audio/Video Processing
Audio and video processing is expensive due to:
- Large file sizes requiring significant bandwidth
- Compute-intensive processing (transcription, frame analysis)
- High token costs for video frame analysis (images are expensive tokens)
Cost-Aware Processing Strategy
class CostOptimizedVideoProcessor:
"""Process video with cost controls and optimization."""
def __init__(self, monthly_budget: float):
self.budget = monthly_budget
self.spent_this_month = 0
# Cost estimates (adjust based on your providers)
self.costs = {
'whisper_per_minute': 0.006,
'video_frame_analysis': 0.01, # Per frame with VLM
'clip_embedding': 0.0001, # Per image
'storage_per_gb_month': 0.02
}
def estimate_video_cost(
self,
video_path: str,
frames_to_sample: int = 20
) -> Dict:
"""Estimate processing cost before committing."""
# Get video metadata
video_info = self._get_video_info(video_path)
duration_minutes = video_info['duration'] / 60
costs = {}
# Audio transcription cost
costs['transcription'] = duration_minutes * self.costs['whisper_per_minute']
# Video frame analysis cost
costs['frame_analysis'] = frames_to_sample * self.costs['video_frame_analysis']
# CLIP embeddings (if using)
costs['embeddings'] = frames_to_sample * self.costs['clip_embedding']
# Storage cost (monthly)
file_size_gb = video_info['size_bytes'] / (1024**3)
costs['storage_monthly'] = file_size_gb * self.costs['storage_per_gb_month']
costs['total_processing'] = sum(
v for k, v in costs.items() if k != 'storage_monthly'
)
return costs
async def process_within_budget(
self,
video_path: str,
priority: str = 'balanced'
) -> Dict:
"""
Process video with automatic cost optimization.
Args:
video_path: Path to video file
priority: 'speed', 'cost', or 'balanced'
Returns:
Processing results with actual cost
"""
estimated_cost = self.estimate_video_cost(video_path)
if self.spent_this_month + estimated_cost['total_processing'] > self.budget:
# Try to reduce cost
optimized_plan = self._optimize_processing_plan(
video_path,
available_budget=self.budget - self.spent_this_month,
priority=priority
)
else:
optimized_plan = self._standard_processing_plan(video_path)
# Execute plan
result = await self._execute_plan(video_path, optimized_plan)
# Track spending
self.spent_this_month += result['actual_cost']
return result
def _optimize_processing_plan(
self,
video_path: str,
available_budget: float,
priority: str
) -> Dict:
"""Create cost-optimized processing plan."""
plan = {}
if priority == 'cost':
# Minimize cost
plan['frame_count'] = 10 # Fewer frames
plan['frame_resolution'] = 512 # Lower resolution
plan['audio_only'] = False
plan['use_scene_detection'] = True # Smart sampling
elif priority == 'speed':
# Maximize speed (which often means lower cost too)
plan['frame_count'] = 15
plan['frame_resolution'] = 768
plan['parallel_processing'] = True
plan['use_scene_detection'] = True
else: # balanced
plan['frame_count'] = 20
plan['frame_resolution'] = 1024
plan['audio_only'] = False
plan['use_scene_detection'] = True
# If still over budget, escalate reductions
estimated = self._estimate_plan_cost(plan)
if estimated > available_budget:
plan['frame_count'] = max(5, plan['frame_count'] - 10)
plan['audio_only'] = True # Skip video analysis
return planError Handling for Audio Systems
Audio processing has unique failure modes that require specific error handling:
class RobustAudioProcessor:
"""Audio processing with comprehensive error handling."""
def __init__(self):
self.transcriber = TranscriptionService()
self.max_retries = 3
self.timeout_seconds = 300
async def process_with_fallbacks(
self,
audio_path: str
) -> Dict:
"""Process audio with multiple fallback strategies."""
errors = []
# Try standard processing
try:
result = await self._process_standard(audio_path)
return {'status': 'success', 'result': result, 'method': 'standard'}
except FileTooLargeError as e:
errors.append(f"File too large: {e}")
# Fallback: compress audio
try:
compressed_path = self._compress_audio(audio_path)
result = await self._process_standard(compressed_path)
return {
'status': 'success',
'result': result,
'method': 'compressed',
'warning': 'Audio was compressed, may affect quality'
}
except Exception as e2:
errors.append(f"Compression failed: {e2}")
except AudioQualityError as e:
errors.append(f"Low quality audio: {e}")
# Fallback: enhance audio
try:
enhanced_path = self._enhance_audio(audio_path)
result = await self._process_standard(enhanced_path)
return {
'status': 'success',
'result': result,
'method': 'enhanced'
}
except Exception as e2:
errors.append(f"Enhancement failed: {e2}")
except TimeoutError as e:
errors.append(f"Processing timeout: {e}")
# Fallback: chunk and process
try:
result = await self._process_chunked(audio_path)
return {
'status': 'success',
'result': result,
'method': 'chunked'
}
except Exception as e2:
errors.append(f"Chunked processing failed: {e2}")
# All methods failed
return {
'status': 'failed',
'errors': errors,
'suggestion': 'Manual review required'
}
def _compress_audio(self, audio_path: str) -> str:
"""Compress audio to reduce file size."""
import subprocess
output_path = audio_path.replace('.mp3', '_compressed.mp3')
subprocess.run([
'ffmpeg', '-i', audio_path,
'-b:a', '64k', # Lower bitrate
'-ar', '16000', # Lower sample rate
output_path
], check=True)
return output_path
def _enhance_audio(self, audio_path: str) -> str:
"""Enhance audio quality (noise reduction, normalization)."""
# Use audio enhancement tools (e.g., noisereduce, pydub)
passStreaming Architecture for Real-Time Processing
Real-time audio/video processing requires careful architectural design:
┌─────────────────────────────────────────────────────────────────────────────┐
│ STREAMING AUDIO/VIDEO ARCHITECTURE │
└─────────────────────────────────────────────────────────────────────────────┘
Client (Browser/App)
│
│ WebSocket/WebRTC
▼
┌──────────────────┐
│ Ingestion │ Accept audio/video stream
│ Service │ Buffer management
└──────────────────┘
│
▼
┌──────────────────┐
│ Message Queue │ Kafka/RabbitMQ
│ (Audio chunks) │ Decouples ingestion from processing
└──────────────────┘
│
├────────────────────┬────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Streaming │ │ Diarization │ │ Embedding │
│ Whisper │ │ Service │ │ Service │
│ Worker │ │ Worker │ │ Worker │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└────────────────────┴────────────────────┘
▼
┌──────────────────┐
│ Results │
│ Aggregation │
└──────────────────┘
│
▼
┌──────────────────┐
│ WebSocket │ Push results to client
│ Server │ Real-time updates
└──────────────────┘
│
▼
Client Display
Key Design Principles:
- Buffering: Collect audio in small chunks (2-5 seconds) with overlap
- Async processing: Workers process chunks in parallel
- Message queue: Decouples ingestion from processing, handles backpressure
- Progressive results: Send partial results immediately, corrections later
- State management: Track session state across chunks
- Error recovery: Handle worker failures without losing chunks
Evaluation and Quality Assurance
Transcription Accuracy: Word Error Rate
The standard metric for speech recognition is Word Error Rate (WER):
WER = (Substitutions + Deletions + Insertions) / Total Words in Reference
from typing import List, Tuple
import difflib
def calculate_wer(reference: str, hypothesis: str) -> Dict:
"""
Calculate Word Error Rate and related metrics.
Args:
reference: Ground truth transcription
hypothesis: Model's transcription
Returns:
Dictionary with WER, insertions, deletions, substitutions
"""
ref_words = reference.lower().split()
hyp_words = hypothesis.lower().split()
# Use difflib to compute edit operations
matcher = difflib.SequenceMatcher(None, ref_words, hyp_words)
substitutions = 0
deletions = 0
insertions = 0
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'replace':
substitutions += max(i2 - i1, j2 - j1)
elif tag == 'delete':
deletions += i2 - i1
elif tag == 'insert':
insertions += j2 - j1
total_words = len(ref_words)
wer = (substitutions + deletions + insertions) / total_words if total_words > 0 else 0
return {
'wer': wer,
'substitutions': substitutions,
'deletions': deletions,
'insertions': insertions,
'total_words': total_words,
'accuracy': 1 - wer
}
def evaluate_transcription_system(
test_cases: List[Tuple[str, str]]
) -> Dict:
"""
Evaluate transcription system on test set.
Args:
test_cases: List of (audio_path, reference_transcript) tuples
Returns:
Aggregate metrics across test set
"""
transcriber = TranscriptionService()
results = []
for audio_path, reference in test_cases:
hypothesis = transcriber.transcribe(audio_path)['text']
wer_metrics = calculate_wer(reference, hypothesis)
results.append({
'audio': audio_path,
**wer_metrics
})
# Aggregate metrics
avg_wer = np.mean([r['wer'] for r in results])
avg_accuracy = 1 - avg_wer
return {
'average_wer': avg_wer,
'average_accuracy': avg_accuracy,
'median_wer': np.median([r['wer'] for r in results]),
'per_file_results': results
}Interpreting WER:
- < 5%: Excellent, near-human performance
- 5-10%: Good, usable for most applications
- 10-20%: Acceptable for some uses, may need review
- > 20%: Poor, likely unusable without significant post-processing
Speaker Diarization Evaluation
Speaker diarization is evaluated using Diarization Error Rate (DER):
def calculate_der(
reference_segments: List[Tuple[float, float, str]],
hypothesis_segments: List[Tuple[float, float, str]],
collar: float = 0.25
) -> Dict:
"""
Calculate Diarization Error Rate.
DER = (False Alarm + Missed Speech + Speaker Error) / Total Speech Time
Args:
reference_segments: List of (start, end, speaker_id) tuples
hypothesis_segments: List of (start, end, speaker_id) tuples
collar: Forgiveness collar in seconds around boundaries
Returns:
DER metrics
"""
# Implementation uses frame-level comparison
# (Full implementation omitted for brevity - see pyannote.metrics)
from pyannote.metrics.diarization import DiarizationErrorRate
metric = DiarizationErrorRate(collar=collar)
# Convert to pyannote format and compute
# (details omitted)
return {
'der': metric.compute(),
'false_alarm': metric.false_alarm,
'missed_speech': metric.missed_speech,
'speaker_confusion': metric.confusion
}Video Content Quality Metrics
Video understanding has no single metric like WER. Use task-specific evaluation:
class VideoQualityEvaluator:
"""Evaluate video understanding quality across multiple dimensions."""
def evaluate_video_summary(
self,
video_path: str,
generated_summary: str,
reference_summary: str
) -> Dict:
"""Evaluate summary quality."""
# ROUGE scores for summary overlap
rouge_scores = self._compute_rouge(generated_summary, reference_summary)
# Semantic similarity
semantic_sim = self._semantic_similarity(generated_summary, reference_summary)
# Factual consistency (check for hallucinations)
frames = extract_keyframes(video_path)
consistency_score = self._check_factual_consistency(
generated_summary,
frames
)
return {
'rouge_1': rouge_scores['rouge1'],
'rouge_l': rouge_scores['rougeL'],
'semantic_similarity': semantic_sim,
'factual_consistency': consistency_score,
'overall_quality': np.mean([
rouge_scores['rougeL'],
semantic_sim,
consistency_score
])
}
def evaluate_temporal_understanding(
self,
video_path: str,
questions: List[str],
answers: List[str],
ground_truth: List[str]
) -> Dict:
"""Evaluate temporal reasoning (before/after, cause/effect)."""
correct = 0
for answer, truth in zip(answers, ground_truth):
if self._answers_match(answer, truth):
correct += 1
return {
'temporal_accuracy': correct / len(questions),
'total_questions': len(questions),
'correct': correct
}Key Takeaways
Audio processing is more than transcription: Modern systems combine transcription, speaker diarization, and temporal understanding. Vocabulary prompting dramatically improves domain-specific accuracy.
Streaming vs. batch is a fundamental tradeoff: Real-time applications sacrifice accuracy for latency. Hybrid approaches (streaming + correction) provide the best user experience.
Video understanding requires intelligent sampling: Not every frame needs analysis. Scene detection, motion-based sampling, and adaptive strategies balance cost against coverage.
Multimodal retrieval needs aligned embeddings: CLIP enables cross-modal search. For production systems, dual embeddings (visual + semantic) maximize recall across query types.
Late fusion provides flexibility: Separate indexes per modality enable modality-specific optimization and interpretable results. Fusion weights can be learned from data.
Cost management is critical: Audio and especially video processing are expensive. Budget-aware processing, intelligent sampling, and fallback strategies prevent cost overruns.
Complete Reference: For full implementations of all components discussed in this chapter, see Multimodal Systems - Code Reference. This includes vision-language integration, audio transcription, speaker diarization, video processing, CLIP embeddings, and multimodal RAG patterns.
Summary
Multimodal AI is where engineering judgment matters most, because the design space explodes. Text RAG forces a few hard choices (chunk size, retriever, reranker). Video and audio retrieval force dozens — sampling rate, modality fusion, segmentation strategy, embedding pair, cost ceiling — and most of those choices have order-of-magnitude consequences for both quality and bill.
Three threads run through the whole chapter:
Time is a first-class input, not a free dimension. Audio and video carry meaning across seconds. Whisper-style spectrogram processing, scene-based frame sampling, and timestamp-aware indexing exist because pretending time doesn’t matter is the fastest way to ship a system that returns plausible but wrong answers. When in doubt, sample more sparsely but keep timestamps — a sparse-but-aligned index beats a dense-but-untimed one for every retrieval task we care about.
Cross-modal alignment is learned, not assumed. CLIP works because it was contrastively trained to put matching image-text pairs near each other. The corollary: image embeddings from a vision-only model and text embeddings from a language-only model live in different spaces and cannot be compared with cosine similarity, no matter how convenient that would be. Either use a CLIP-family model end-to-end, or fuse late (search each modality separately, then combine).
Cost is the design constraint that bites in production. A video question-answering system that uniformly samples 1 frame per second at 4K resolution will bankrupt you. Adaptive sampling, resolution tiering, audio-first transcription with selective frame inspection, and explicit per-request budgets are the difference between a demo and a service. Build the cost guardrails before you tune for quality, not after.
Audio, video, and cross-modal retrieval aren’t just “more modalities” — they’re a forcing function for everything Part III taught you: deployment thinking (Ch09), cost engineering (Ch32), evaluation discipline (Ch15), and reliability under spiky workloads (Ch31). If you can ship multimodal RAG to production, you can ship anything.
Practical Exercises
Audio Transcription Pipeline: Build a production audio processing system that handles long files (>1 hour), includes speaker diarization, and supports domain-specific vocabulary. Measure WER on a test set and optimize for cost.
Video Analysis System: Implement a video understanding system with multiple sampling strategies. Compare uniform vs. scene-based vs. adaptive sampling on 10 diverse videos. Measure quality (summary accuracy) vs. cost tradeoffs.
Multimodal RAG: Build a RAG system for a corpus containing text documents, images, and videos. Index with both CLIP and text embeddings. Implement queries that search across all modalities and evaluate retrieval quality.
Meeting Transcription Service: Create an end-to-end meeting transcription service with real-time display, speaker identification, and post-meeting summary/action item extraction. Deploy with WebSocket streaming.
Podcast Search Engine: Build a searchable podcast database. Transcribe episodes, create embeddings of segments, enable queries like “episodes discussing remote work challenges” that return timestamped clips. Measure search quality with user studies.
Cost Optimization Challenge: Given a fixed monthly budget, process the maximum number of videos possible while maintaining minimum quality thresholds. Implement adaptive sampling, resolution adjustment, and priority queuing.
Self-Assessment Checkpoint
Conceptual Questions
Q1. [IC2] Explain why mel spectrograms are used for speech recognition instead of raw waveforms. What properties of human hearing do they capture?
Answer
Raw waveforms have extremely high dimensionality (16,000 samples/second) and don’t align with how humans perceive sound. Mel spectrograms: (1) Apply Fourier transform to convert time-domain signal to frequency domain, revealing what frequencies are present at each moment. (2) Use mel-scale filterbanks that mimic human hearing—we’re more sensitive to differences in low frequencies (100Hz vs 200Hz is perceptually large) than high frequencies (10kHz vs 10.1kHz is barely noticeable). (3) Compress frequency information logarithmically, allocating more resolution to perceptually-important ranges. (4) Create a 2D representation (time × frequency) that transformers can process as sequences. Result: 80-dimensional vectors per 10ms frame vs. 160 raw samples per 10ms—a 2x compression with perceptually-relevant features.Q2. [IC2] What is speaker diarization and why is it challenging? How does it differ from speaker identification?
Answer
Speaker diarization answers “when did different speakers speak?” without knowing who they are. It segments audio by speaker and labels segments (SPEAKER_00, SPEAKER_01, etc.). Challenges: (1) Unknown number of speakers in advance. (2) Similar voices (same gender, accent) are hard to distinguish. (3) Speaker overlap (simultaneous speech) creates ambiguity. (4) Short utterances lack sufficient voice characteristics. Speaker identification (a different task) answers “who is speaking?” by matching voices to known identities—requires a database of voice samples. Diarization is unsupervised clustering; identification is supervised classification. Production systems often combine both: diarize first, then match clusters to known users.Q3. [Senior] Compare different video frame sampling strategies (uniform, scene-based, motion-based). When would you choose each, and what are the tradeoffs?
Answer
Uniform sampling: Fixed intervals (every Nth frame or fixed FPS). Pros—simple, predictable cost, works for continuous content. Cons—wastes frames on static scenes, may miss brief important events. Use for sports, tutorials, unknown content. Scene-based: One frame per scene change (detected via visual similarity). Pros—captures each distinct scene once, efficient for structured content. Cons—requires scene detection (adds complexity), may oversample static scenes, undersample dynamic scenes. Use for presentations, movies, edited content. Motion-based: More frames during high-motion periods. Pros—focuses on activity, efficient for surveillance. Cons—may miss important static content (e.g., a sign), requires motion computation. Use for security footage, action videos. Adaptive (hybrid): Combines strategies (e.g., 60% uniform + 40% scene-based). Pros—robust to diverse content types. Cons—more complex. Use when video type is unknown or varies. Key tradeoff: coverage (sample enough to capture content) vs. cost (each frame analyzed costs money/time).Q4. [Senior] How do you build a multimodal RAG system that searches across text documents, images, and audio? What indexing strategy would you use?
Answer
Strategy: Index each modality with appropriate embeddings. (1) Text: Standard text embeddings (e.g., text-embedding-3-large) for semantic search. (2) Images: Dual embeddings—CLIP visual embeddings for image similarity + text embeddings of VLM-generated descriptions for text search. (3) Audio: Transcribe to text, embed transcriptions, also store timestamps for retrieval. (4) Cross-modal links: Maintain relationships (e.g., image on page 5 of document X, audio clip at 3:45 in video Y). Query processing: (1) Text query → embed with text model, search text chunks. (2) Text query → embed with CLIP text encoder, search image visual embeddings. (3) Text query → search audio transcripts. (4) Image query → embed with CLIP image encoder, search image database. Combine results with late fusion (weighted combination per modality) or early fusion (single unified index). Late fusion preferred for interpretability and modality-specific optimization. Return context with references to original modalities (text snippets, image paths with descriptions, audio clips with timestamps). Generate answer with VLM that can reference retrieved images.Q5. [Staff] Design a real-time video streaming transcription system for live events. What are the key architectural components and how do you handle latency vs. accuracy tradeoffs?
Answer
Architecture: (1) Ingestion layer: WebSocket/WebRTC to receive live video stream. (2) Audio extraction: Separate audio track in real-time. (3) Buffering: Collect 2-5 second audio chunks with 0.5-1s overlap. (4) Message queue (Kafka/Redis): Decouple ingestion from processing, handle backpressure. (5) Streaming transcription workers: Process chunks with streaming Whisper (optimized for low latency). (6) Batch correction workers: Every 30s, re-process buffered audio with full Whisper for accuracy. (7) Aggregation service: Merge streaming results, apply corrections, manage speaker turns. (8) WebSocket push: Send results to clients with type tags (streaming vs corrected). Latency vs accuracy: Streaming gives 200-500ms latency but 92-95% accuracy (limited context, no look-ahead). Batch correction gives 95-98% accuracy but 2-5s latency. Solution: Display streaming results immediately for engagement, update with corrections when ready. UI shows provisional text (gray) that becomes finalized (black) after correction. State management: Track session context across chunks (vocabulary, speaker embeddings). Error handling: Worker failures shouldn’t lose chunks (queue persistence), degraded mode if correction fails (streaming only). Scaling: Horizontal scaling of workers, load balancing across processing nodes. Cost: Streaming is more expensive than batch (maintained connections, more processing) but required for real-time UX.Spot the Problem
Problem 1. [IC2] Audio chunking code:
def transcribe_long_audio(audio_path: str) -> str:
chunks = split_audio(audio_path, chunk_size=30000) # 30 seconds
transcripts = [transcribe(chunk) for chunk in chunks]
return ' '.join(transcripts)Answer
Problems: (1) No overlap between chunks—words cut in half at boundaries will be lost or garbled. (2) Concatenating with spaces loses sentence boundaries—last word of chunk 1 and first word of chunk 2 run together. (3) No timestamp alignment—can’t know when each word was spoken. (4) No deduplication—if using overlap, words in overlap region appear twice. (5) Speaker turns may be split—middle of sentence becomes two separate segments. Better: Use overlapping chunks (e.g., 30s chunks, 5s overlap), merge at word level based on timestamps, detect and remove duplicates in overlap regions, preserve speaker context across chunks.Problem 2. [Senior] Video sampling:
def sample_video(video_path: str) -> list:
frames = extract_all_frames(video_path)
return frames[::300] # Every 300th frame (~10 seconds at 30fps)Answer
Issues: (1) Fixed interval ignores content—10 seconds of static scene gets same sampling as 10 seconds of action. (2) No consideration of video characteristics—a 2-minute video and 2-hour video both sampled at same rate. (3) May miss scene transitions that fall between samples. (4) Extracts all frames first (memory intensive, slow)—should sample during extraction. (5) No frame quality check—might sample blurry frames during camera motion. Better: Use scene detection to sample key frames, implement adaptive sampling based on motion, cap total frames based on budget/requirements (e.g., max 20 frames regardless of duration), sample directly without extracting all frames, consider video duration (longer videos need sparser sampling).Problem 3. [Staff] Multimodal search implementation:
def search(query: str) -> list:
text_results = text_index.search(query)
image_results = image_index.search(query)
return text_results + image_results # ConcatenateAnswer
Critical issues: (1) No score normalization—text and image similarity scores on different scales (e.g., 0-1 vs 0-100). Combining raw scores is meaningless. (2) No weighting—treats text and image results equally even though they have different retrieval characteristics and relevance distributions. (3) No deduplication—same document might appear in both results (text match + image match). (4) Simple concatenation doesn’t rank across modalities—all text results before any image results. (5) No consideration of query type—visual queries (“find the org chart”) should weight image results higher. (6) No top-k limit—might return hundreds of results. Better: Normalize scores to common scale [0, 1], weight by modality (learned or heuristic), combine scores by document ID, re-rank combined results, apply top-k, consider query classification to adjust weights dynamically.Design Exercises
Exercise 1. [Senior] Design a searchable video lecture platform for a university. Students should be able to search across thousands of recorded lectures to find specific topics, see relevant slides, and jump to exact timestamps. Consider indexing strategy, query processing, and UI for results.
Guidance
Indexing pipeline: (1) Extract audio, transcribe with Whisper, get word-level timestamps. (2) Use scene detection to identify slide transitions (lectures typically show slides). (3) OCR each unique slide for text content. (4) Sample 1-2 frames per slide, generate VLM descriptions. (5) Create embeddings: (a) Transcript segments (30-60s chunks) with text embeddings, (b) Slide OCR text with text embeddings, (c) Slide images with CLIP embeddings. (6) Store in vector DB with metadata (lecture ID, timestamp, slide number, course). Query processing: (1) Text query → embed and search transcript segments + slide text. (2) Rank by relevance + recency + popularity. (3) Group results by lecture, deduplicate. (4) For each result, retrieve: transcript context, slide image, timestamp. UI: (1) Search bar with autocomplete from indexed topics. (2) Results show: lecture title, relevant slide thumbnail, transcript snippet, timestamp. (3) Click result → video player jumps to timestamp, shows slide, highlights transcript. (4) Sidebar shows all slides from lecture for navigation. (5) “Similar lectures” based on embedding similarity. Advanced features: (1) Temporal search: “second half of the semester only”. (2) Visual search: upload diagram, find similar slides. (3) Concept clustering: “show all lectures covering dynamic programming”. (4) Personalization: track what student has watched, recommend gaps.Exercise 2. [Staff] Design a content moderation system for a video streaming platform that detects prohibited content (violence, hate speech, explicit material) in real-time. Must process live streams with <2 second latency. Consider accuracy requirements, cost constraints, and different types of violations.
Guidance
Multi-tier detection: Tier 1 (fast, high recall): (1) Audio: VAD (voice activity detection) + streaming transcription + keyword matching for hate speech. (2) Video: Sample 2-3 fps, run lightweight classifier (e.g., fine-tuned ResNet) for NSFW/violence. (3) Latency: 200-500ms. (4) Action: Flag for Tier 2 if suspicious. Tier 2 (slower, high precision): (1) VLM analysis of flagged segments with detailed prompts. (2) Audio segment sent to hate speech classifier. (3) Latency: 1-2s. (4) Action: Auto-block if high confidence, human review if medium confidence. Tier 3 (human review): (1) Queue of medium-confidence flags. (2) Moderators view context (before/after clip). (3) Decision: allow, block, warn creator. Architecture: (1) Stream ingestion → buffer. (2) Parallel processing lanes for audio and video. (3) Message queue to handle spike loads. (4) Priority processing for high-view-count streams. (5) Appeal system for false positives. Challenges: (1) Context matters—violence in news vs gratuitous violence. (2) Cultural differences—what’s acceptable varies by region. (3) Evolving violations—new hate symbols, coded language. (4) Adversarial evasion—bad actors deliberately evade detection. Cost: Tier 1 cheap (simple models), Tier 2 expensive (VLM per flagged segment), Tier 3 most expensive (human labor). Optimize by minimizing false positives from Tier 1. Metrics: Precision (% of blocks that are true violations), Recall (% of violations caught), False positive rate (wrongly blocked content—hurts creators), Latency (time to block).Further Reading
Essential
- Radford et al. (2021), “CLIP” - Contrastive multimodal learning foundation.
- Radford et al. (2022), “Whisper” - The paper that transformed speech-to-text.
- Girdhar et al. (2023), “ImageBind” - Unified embeddings across six modalities.
Deep Dives
- Arnab et al. (2021), “ViViT” - Applying transformers to video understanding.
- Bredin et al. (2020), “pyannote.audio” - Modern neural speaker diarization.
- Kim et al. (2024), “Multimodal RAG for Long-Form Video QA” - RAG for video content.
Connections to Other Chapters
- Chapter 7 (RAG Systems): Vector search and retrieval strategies extend to multimodal content
- Chapter 7 (Advanced RAG): Chunking strategies apply to temporal segmentation of audio/video
- Chapter 17 (Vision & Document AI): Vision foundations and VLM architectures underlie video understanding
- Chapter 18 (Audio & Speech Systems): Audio processing and transcription for video soundtracks
- Chapter 15 (MLOps & Evaluation): Metrics and evaluation frameworks for production ML systems