Chapter 19: Video & Multimodal RAG

Keywords

video, multimodal, temporal, scene understanding, video embeddings, cross-modal retrieval, streaming

Introduction

A manufacturing company faced a quality assurance challenge. Their assembly lines produced thousands of units daily, but defect detection relied on periodic human inspections. By the time a problem was caught, hundreds of defective units had already been produced.

They deployed a video analysis system that continuously monitored assembly operations, detecting anomalies in real-time: a misaligned component, an unusual movement pattern, or a missing step in the assembly sequence. Defect detection time dropped from hours to seconds, reducing waste by 60%.

This illustrates why video understanding matters. While images capture moments, video captures processes—the before, during, and after that reveals how things happen, not just what exists.

Why Video Understanding Matters

Temporal context reveals meaning: A person raising their hand is ambiguous in a single frame—are they waving, reaching, or signaling? Video shows the complete gesture and its purpose.

Video is everywhere: Security footage, instructional content, meetings, product demos, medical procedures—vast amounts of organizational knowledge exist only in video form.

Multimodal search unlocks archives: Organizations have terabytes of video that’s effectively unsearchable. Combining visual, audio, and text understanding makes this content accessible.

What You’ll Learn

This chapter focuses on video understanding and multimodal retrieval:

  • Frame sampling strategies that balance coverage with cost
  • Temporal understanding: tracking objects and recognizing actions across time
  • Vision-language models (VLMs) for video description and question answering
  • Multimodal embeddings: CLIP and unified embedding spaces
  • Building multimodal RAG systems that search across text, images, audio, and video
  • Production architectures for video processing at scale

For audio processing fundamentals and speech-to-text systems, see Chapter 18.

Prerequisites

  • Understanding of transformer architecture and attention (Chapter 5)
  • Familiarity with embeddings and vector search (Chapter 7)
  • RAG system design (Chapter 8)
  • Audio processing basics (Chapter 18)

Video Understanding

Video adds temporal dimension to vision. A single frame is an image; a video is a sequence of images with temporal relationships and often synchronized audio.

The Temporal Dimension

Understanding video requires processing that single images cannot provide:

  • Object tracking: Following entities across frames as they move, appear, and disappear
  • Action recognition: Understanding what’s happening over time (waving, running, assembling)
  • Temporal reasoning: Understanding before/after relationships, cause and effect, sequences
  • Change detection: Identifying what’s different between time points, detecting anomalies
  • Event detection: Recognizing when specific events occur in long videos

Unlike static images, video has:

  • Motion: Information encoded in how things change between frames
  • Temporal context: Current frame’s meaning depends on what came before
  • Audio alignment: Speech, music, and sound effects synchronized with visuals
  • Narrative structure: Videos often have intentional story arcs or informational progression

Frame Sampling Strategies

Vision-language models can’t process thousands of frames (a 1-minute video at 30fps is 1800 frames). Intelligent sampling is essential for balancing coverage, cost, and accuracy.

import cv2
import numpy as np
from typing import List, Dict
from scenedetect import detect, ContentDetector

class VideoSampler:
    """Sample frames from video using various strategies."""

    def __init__(self):
        self.scene_detector = ContentDetector()

    def sample(
        self,
        video_path: str,
        strategy: str = 'uniform',
        max_frames: int = 20
    ) -> List[Dict]:
        """
        Sample frames using specified strategy.

        Args:
            video_path: Path to video file
            strategy: 'uniform', 'scene_based', 'motion_based', or 'adaptive'
            max_frames: Maximum number of frames to sample

        Returns:
            List of sampled frames with metadata
        """
        if strategy == 'uniform':
            return self.uniform_sample(video_path, max_frames)
        elif strategy == 'scene_based':
            return self.scene_sample(video_path, max_frames)
        elif strategy == 'motion_based':
            return self.motion_sample(video_path, max_frames)
        elif strategy == 'adaptive':
            return self.adaptive_sample(video_path, max_frames)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")

    def uniform_sample(self, video_path: str, max_frames: int) -> List[Dict]:
        """Sample frames at uniform intervals."""
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)

        # Calculate interval to get exactly max_frames
        interval = max(1, total_frames // max_frames)

        frames = []
        for i in range(max_frames):
            frame_idx = i * interval
            if frame_idx >= total_frames:
                break

            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            ret, frame = cap.read()

            if ret:
                frames.append({
                    'frame': frame,
                    'frame_idx': frame_idx,
                    'timestamp': frame_idx / fps,
                    'strategy': 'uniform'
                })

        cap.release()
        return frames

    def scene_sample(self, video_path: str, max_frames: int) -> List[Dict]:
        """
        Sample one frame per scene change.

        Scene detection identifies significant visual changes (cuts, transitions).
        This is ideal for videos with distinct scenes (movies, presentations).
        """
        # Detect scenes using PySceneDetect
        scene_list = detect(video_path, ContentDetector())

        # If more scenes than max_frames, sample uniformly from scenes
        if len(scene_list) > max_frames:
            scene_interval = len(scene_list) // max_frames
            selected_scenes = scene_list[::scene_interval][:max_frames]
        else:
            selected_scenes = scene_list

        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)

        frames = []
        for idx, scene in enumerate(selected_scenes):
            # Sample from middle of scene for representative frame
            start_frame = int(scene[0].get_frames())
            end_frame = int(scene[1].get_frames())
            mid_frame = (start_frame + end_frame) // 2

            cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame)
            ret, frame = cap.read()

            if ret:
                frames.append({
                    'frame': frame,
                    'frame_idx': mid_frame,
                    'timestamp': mid_frame / fps,
                    'scene_idx': idx,
                    'strategy': 'scene_based'
                })

        cap.release()
        return frames

    def motion_sample(self, video_path: str, max_frames: int) -> List[Dict]:
        """
        Sample more frames during high-motion periods.

        Useful for action videos, sports, or surveillance where interesting
        events correlate with motion.
        """
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Compute motion scores for segments
        segment_size = 30  # ~1 second at 30fps
        motion_scores = []
        prev_frame = None

        for i in range(0, total_frames, segment_size):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()

            if ret and prev_frame is not None:
                # Calculate frame difference as motion proxy
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
                diff = cv2.absdiff(gray, prev_gray)
                motion_score = np.mean(diff)
                motion_scores.append((i, motion_score))

            prev_frame = frame

        # Sort segments by motion score
        motion_scores.sort(key=lambda x: x[1], reverse=True)

        # Sample from high-motion segments
        selected_indices = [idx for idx, _ in motion_scores[:max_frames]]
        selected_indices.sort()  # Maintain temporal order

        frames = []
        for frame_idx in selected_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            ret, frame = cap.read()

            if ret:
                frames.append({
                    'frame': frame,
                    'frame_idx': frame_idx,
                    'timestamp': frame_idx / fps,
                    'strategy': 'motion_based'
                })

        cap.release()
        return frames

    def adaptive_sample(self, video_path: str, max_frames: int) -> List[Dict]:
        """
        Adaptive sampling: uniform baseline + scene changes + high-motion.

        This hybrid approach works well for unknown video types.
        """
        # Use 60% of budget for uniform sampling
        uniform_count = int(max_frames * 0.6)
        uniform_frames = self.uniform_sample(video_path, uniform_count)

        # Use 40% for scene changes
        scene_count = max_frames - uniform_count
        scene_frames = self.scene_sample(video_path, scene_count)

        # Combine and deduplicate by frame index
        all_frames = uniform_frames + scene_frames
        seen_indices = set()
        unique_frames = []

        for frame_data in sorted(all_frames, key=lambda x: x['frame_idx']):
            if frame_data['frame_idx'] not in seen_indices:
                frame_data['strategy'] = 'adaptive'
                unique_frames.append(frame_data)
                seen_indices.add(frame_data['frame_idx'])

        return unique_frames[:max_frames]
Common Mistake: Fixed Frame Sampling Regardless of Content

What people do: Sample every Nth frame from all videos (e.g., 1 frame every 10 seconds), regardless of whether it’s a static presentation or fast-paced action footage.

Why it fails: Fixed sampling wastes resources on static content (10 identical frames from a slide that didn’t change) while missing critical moments in dynamic content (the 2-second action sequence between sample points).

Fix: Match sampling strategy to video type. Use scene detection for presentations and edited content. Use motion-based sampling for surveillance or action videos. For unknown content, use adaptive sampling that combines uniform baseline with scene-change detection.

Choosing a Sampling Strategy

Video Type Recommended Strategy Rationale
Surveillance footage Motion-based Most frames are static; focus on activity
Lectures/Presentations Scene-based Each slide is a scene; sample once per slide
Sports highlights High framerate uniform Continuous action; need temporal resolution
Movies/TV Scene-based Natural scene structure; each scene tells story
Product demos Uniform or adaptive Steady progression through features
Tutorials Scene-based Each step is a distinct visual scene
Nature documentaries Adaptive Mix of static scenes and action

Video Analysis with VLMs

Modern vision-language models can analyze sequences of frames to understand video content:

import base64
from typing import List, Dict, Optional
from io import BytesIO
from PIL import Image

class VideoAnalyzer:
    """Analyze video content using vision-language models."""

    def __init__(self, vlm_client):
        self.vlm = vlm_client
        self.sampler = VideoSampler()

    async def analyze(
        self,
        video_path: str,
        analysis_type: str = 'summary',
        max_frames: int = 20
    ) -> Dict:
        """
        Analyze video with appropriate sampling and prompting.

        Args:
            video_path: Path to video file
            analysis_type: 'summary', 'action_recognition', 'scene_description', or 'qa'
            max_frames: Maximum frames to sample

        Returns:
            Analysis results with metadata
        """
        # Choose sampling strategy based on analysis type
        strategy = {
            'summary': 'adaptive',
            'action_recognition': 'motion_based',
            'scene_description': 'scene_based',
            'qa': 'uniform'
        }.get(analysis_type, 'adaptive')

        frames = self.sampler.sample(video_path, strategy, max_frames)

        # Build multi-image prompt
        content = [
            {
                "type": "text",
                "text": self._get_prompt(analysis_type, len(frames))
            }
        ]

        for frame_data in frames:
            # Add timestamp annotation
            content.append({
                "type": "text",
                "text": f"\n[Frame at {frame_data['timestamp']:.1f}s]"
            })

            # Encode frame as base64 image
            content.append(self._encode_frame(frame_data['frame']))

        response = await self.vlm.analyze(content)

        return {
            'analysis': response,
            'frame_count': len(frames),
            'strategy': strategy,
            'video_duration': frames[-1]['timestamp'] if frames else 0
        }

    def _get_prompt(self, analysis_type: str, frame_count: int) -> str:
        """Generate appropriate prompt for analysis type."""
        prompts = {
            'summary': f"""You are viewing {frame_count} frames sampled from a video.
                Provide a comprehensive summary including:
                1. Overall content and topic
                2. Key events or changes observed across frames
                3. Main subjects or objects and how they change
                4. Setting and context
                5. Any text visible in the video
                6. Temporal progression (what happens over time)""",

            'action_recognition': f"""Analyze these {frame_count} frames for actions and events.
                For each significant action observed:
                - Describe the action clearly
                - Note approximate timestamp from frame labels
                - Identify actors/subjects involved
                - Describe any objects or tools used
                - Indicate the start and end of each action""",

            'scene_description': f"""Describe each scene in this video based on {frame_count} frames.
                For each distinct scene:
                - Describe the visual setting and environment
                - Note key visual elements and composition
                - Identify any text, graphics, or UI elements
                - Describe the mood, tone, or atmosphere
                - Note transitions between scenes""",

            'qa': f"""You are viewing {frame_count} frames from a video.
                Answer questions about the video content based on what you observe.
                Reference specific frames by timestamp when relevant.
                If you cannot answer based on the frames shown, say so."""
        }
        return prompts.get(analysis_type, prompts['summary'])

    def _encode_frame(self, frame: np.ndarray) -> Dict:
        """Encode frame as base64 for VLM input."""
        # Convert BGR (OpenCV) to RGB (PIL)
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(rgb_frame)

        # Resize if too large (save on token cost)
        max_dim = 1024
        if max(pil_image.size) > max_dim:
            ratio = max_dim / max(pil_image.size)
            new_size = tuple(int(dim * ratio) for dim in pil_image.size)
            pil_image = pil_image.resize(new_size, Image.LANCZOS)

        # Encode as JPEG base64
        buffer = BytesIO()
        pil_image.save(buffer, format='JPEG', quality=85)
        img_base64 = base64.b64encode(buffer.getvalue()).decode()

        return {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/jpeg;base64,{img_base64}"
            }
        }
Common Mistake: Ignoring the Audio Track in Video Analysis

What people do: Process videos purely as sequences of images, discarding the audio entirely, then wonder why the analysis misses obvious context.

Why it fails: In a product review video, the reviewer might show a product while saying “I was disappointed”—the visual shows the product, but the sentiment is in the audio. Meeting recordings have content primarily in speech, not visuals.

Fix: Always extract and transcribe the audio track. Fuse visual and audio analysis for comprehensive understanding. For videos with significant speech, the transcript often provides more valuable search content than the visual frames.

Combining Video and Audio

Most videos have audio that provides crucial context. Speech, music, and sound effects carry information that visuals alone miss. Combining modalities creates richer understanding:

class MultimodalVideoProcessor:
    """Process video with both visual and audio analysis."""

    def __init__(self):
        self.video_analyzer = VideoAnalyzer(vlm_client)
        self.audio_transcriber = TranscriptionService()
        self.diarizer = DiarizedTranscription(hf_token)

    async def process(self, video_path: str) -> Dict:
        """
        Process video and audio together for comprehensive understanding.

        Returns:
            Combined analysis with visual description, transcript, and fusion
        """
        # Extract audio track from video
        audio_path = self._extract_audio(video_path)

        # Process visual and audio in parallel
        video_task = asyncio.create_task(
            self.video_analyzer.analyze(video_path, 'summary')
        )
        audio_task = asyncio.create_task(
            self.diarizer.process(audio_path)
        )

        video_result, audio_result = await asyncio.gather(video_task, audio_task)

        # Fuse analyses with LLM
        fused = await self._fuse_analyses(video_result, audio_result)

        return {
            'video_analysis': video_result,
            'transcript': audio_result,
            'fused_summary': fused,
            'video_path': video_path
        }

    def _extract_audio(self, video_path: str) -> str:
        """Extract audio track from video file."""
        import subprocess

        audio_path = video_path.replace('.mp4', '_audio.mp3')

        # Use ffmpeg to extract audio
        subprocess.run([
            'ffmpeg', '-i', video_path,
            '-vn',  # No video
            '-acodec', 'mp3',
            '-y',  # Overwrite
            audio_path
        ], check=True, capture_output=True)

        return audio_path

    async def _fuse_analyses(
        self,
        video: Dict,
        audio: Dict
    ) -> str:
        """Combine visual and audio understanding into unified summary."""

        # Format transcript with speaker labels and timestamps
        transcript_text = self._format_transcript(audio['turns'])

        prompt = f"""Combine these analyses of a video into a unified summary:

VISUAL ANALYSIS:
{video['analysis']}

AUDIO TRANSCRIPT:
{transcript_text}

Provide a coherent summary that:
1. Integrates what was seen and heard into a single narrative
2. Notes when visual and audio provide complementary information
3. Identifies speakers and associates them with visual subjects when possible
4. Creates a timeline of key events combining both modalities
5. Highlights any discrepancies between visual and audio content

Be specific about timestamps and reference both visual frames and audio timing."""

        response = await llm.generate(prompt, max_tokens=1000)
        return response

    def _format_transcript(self, turns: List[Dict]) -> str:
        """Format speaker turns for readable transcript."""
        lines = []
        for turn in turns:
            timestamp = f"[{turn['start']:.1f}s - {turn['end']:.1f}s]"
            speaker = turn['speaker']
            text = turn['text']
            lines.append(f"{timestamp} {speaker}: {text}")
        return "\n".join(lines)

Use Cases for Multimodal Video Analysis

  1. Educational Content: Match lecture slides (visual) with professor’s explanation (audio) to create searchable, navigable content
  2. Product Reviews: Analyze what reviewer shows (visual) vs. what they say (audio) to detect enthusiasm vs. criticism
  3. Surveillance: Detect activities (visual) and speech (audio) for comprehensive security monitoring
  4. Meeting Analysis: Understand presentations (slides), discussion (speech), and reactions (body language)
  5. Content Moderation: Detect prohibited visual content and spoken content together for comprehensive moderation

Complete code: See Video Processing for production-ready video analysis with frame sampling, quality optimization, and batch processing.


Multimodal Embeddings and Retrieval

Standard RAG retrieves text using text queries. But what if you want to search across modalities?

Beyond Text Search: Cross-Modal Retrieval

Cross-modal retrieval enables queries like:

  • “Find product demo videos where someone mentions battery life” (text → video)
  • “Find podcast episodes discussing charts about revenue growth” (text → audio + visual)
  • “Find documents containing diagrams similar to this sketch” (image → documents)
  • “Locate meetings where team discussed the Q3 roadmap slide” (text → video + slides)

This requires embeddings that align different modalities in a shared space.

CLIP Embeddings: Theory and Practice

CLIP (Contrastive Language-Image Pre-training) creates a shared embedding space where images and text that are semantically related map to nearby points. This enables text-to-image and image-to-text retrieval.

How CLIP Aligns Modalities

CLIP was trained on 400 million image-text pairs from the internet using contrastive learning:

  1. For a batch of N image-text pairs, encode all N images and all N texts
  2. We have N correct pairs (image[i] matches text[i])
  3. We have N*(N-1) incorrect pairs (all other combinations)
  4. Train to maximize similarity for correct pairs, minimize for incorrect pairs

Through billions of such comparisons, CLIP learns:

  • Semantic understanding: “dog” and “canine” are similar
  • Visual concepts: “sunset” embeds near orange/red images
  • Compositionality: “red car” vs. “blue car” differ in predictable ways
  • Abstraction: “chaotic scene” has meaningful visual correlates

Using CLIP for Cross-Modal Search

import torch
import numpy as np
from transformers import CLIPProcessor, CLIPModel
from typing import List, Tuple

class CLIPSearchEngine:
    """Cross-modal search using CLIP embeddings."""

    def __init__(self, model_name: str = "openai/clip-vit-large-patch14"):
        self.model = CLIPModel.from_pretrained(model_name)
        self.processor = CLIPProcessor.from_pretrained(model_name)

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)
        self.model.eval()  # Inference mode

    def embed_images(self, images: List) -> np.ndarray:
        """
        Embed multiple images into CLIP space.

        Args:
            images: List of PIL Images or file paths

        Returns:
            Array of shape (num_images, embedding_dim)
        """
        inputs = self.processor(images=images, return_tensors="pt", padding=True)
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        with torch.no_grad():
            features = self.model.get_image_features(**inputs)

        # Normalize for cosine similarity
        features = features / features.norm(dim=-1, keepdim=True)
        return features.cpu().numpy()

    def embed_text(self, texts: List[str]) -> np.ndarray:
        """
        Embed text queries into CLIP space.

        Args:
            texts: List of text queries

        Returns:
            Array of shape (num_texts, embedding_dim)
        """
        inputs = self.processor(text=texts, return_tensors="pt", padding=True)
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        with torch.no_grad():
            features = self.model.get_text_features(**inputs)

        features = features / features.norm(dim=-1, keepdim=True)
        return features.cpu().numpy()

    def search_images_with_text(
        self,
        query: str,
        image_embeddings: np.ndarray,
        top_k: int = 10
    ) -> List[Tuple[int, float]]:
        """
        Search image collection using text query.

        Args:
            query: Text description to search for
            image_embeddings: Pre-computed image embeddings
            top_k: Number of results to return

        Returns:
            List of (index, similarity_score) tuples
        """
        # Embed query text
        query_embedding = self.embed_text([query])[0]

        # Compute similarities (cosine similarity, since normalized)
        similarities = image_embeddings @ query_embedding

        # Get top-k matches
        top_indices = np.argsort(similarities)[-top_k:][::-1]
        results = [(int(idx), float(similarities[idx])) for idx in top_indices]

        return results

    def search_text_with_image(
        self,
        query_image,
        text_embeddings: np.ndarray,
        top_k: int = 10
    ) -> List[Tuple[int, float]]:
        """Search text collection using image query."""
        query_embedding = self.embed_images([query_image])[0]
        similarities = text_embeddings @ query_embedding

        top_indices = np.argsort(similarities)[-top_k:][::-1]
        return [(int(idx), float(similarities[idx])) for idx in top_indices]

Practical Considerations for CLIP

1. Prompt Engineering for Better Retrieval

CLIP was trained on web captions, which tend to follow patterns. Better queries match training distribution:

# Less effective
query = "cat"

# More effective (matches caption style)
query = "a photo of a cat"
query = "a photograph showing a cat"

# Even more specific
query = "a photograph of a gray tabby cat sitting on a windowsill"

2. The Modality Gap

Despite alignment, CLIP’s image and text embeddings cluster in different regions of space. You can tell whether an embedding came from an image or text. This affects retrieval:

def adjust_for_modality_gap(
    text_embedding: np.ndarray,
    image_embeddings: np.ndarray,
    image_centroid: np.ndarray,
    text_centroid: np.ndarray,
) -> np.ndarray:
    """
    Reduce the modality gap before ranking image results for a text query.

    A common mistake is to "calibrate" by dividing similarities by a
    temperature. That is a *monotonic* transform — it rescales the scores
    but leaves their order unchanged, so it does nothing for retrieval rank.
    The gap is essentially a translation between the image and text clusters,
    so the effective fix is to mean-center each modality (subtract that
    modality's centroid, computed over your corpus) and re-normalize before
    comparing.
    """
    img = image_embeddings - image_centroid
    txt = text_embedding - text_centroid
    img /= np.linalg.norm(img, axis=1, keepdims=True)
    txt /= np.linalg.norm(txt)
    return img @ txt

3. Domain Adaptation

CLIP was trained on natural web images. Specialized domains (medical imaging, satellite imagery, technical diagrams) may embed poorly without fine-tuning:

# Fine-tune CLIP on domain-specific data
from transformers import CLIPVisionModel, CLIPTextModel

class DomainCLIP(CLIPSearchEngine):
    """CLIP fine-tuned for specific domain."""

    def fine_tune(self, image_text_pairs: List[Tuple], epochs: int = 5):
        """
        Fine-tune CLIP on domain-specific image-text pairs.

        This adapts the model to domain vocabulary and visual patterns.
        """
        # Implement contrastive fine-tuning
        # (Code omitted for brevity - see CLIP training tutorials)
        pass

Multimodal RAG: Documents with Images

Documents containing both text and images need multimodal indexing to support queries that reference either modality:

from typing import Dict, List, Optional
import hashlib

class MultimodalDocumentRAG:
    """RAG system for documents containing text, images, tables, and charts."""

    def __init__(self):
        self.text_embedder = TextEmbeddingModel()  # e.g., text-embedding-3-large
        self.clip_embedder = CLIPSearchEngine()
        self.vlm = VisionLanguageModel()
        self.vector_store = VectorDatabase()

    def index_document(self, doc_path: str, doc_id: str):
        """
        Index all modalities of a document.

        Strategy:
        - Text chunks → text embeddings (for semantic text search)
        - Images → visual embeddings (for image similarity search)
        - Images → VLM descriptions → text embeddings (for text-based image search)
        """
        pages = self._extract_pages(doc_path)

        for page_num, page in enumerate(pages):
            # Index text chunks
            text_chunks = self._chunk_text(page['text'])
            for chunk_idx, chunk in enumerate(text_chunks):
                embedding = self.text_embedder.encode(chunk['text'])

                self.vector_store.upsert(
                    id=f"{doc_id}_p{page_num}_text_{chunk_idx}",
                    vector=embedding,
                    metadata={
                        'doc_id': doc_id,
                        'page': page_num,
                        'type': 'text',
                        'content': chunk['text'],
                        'chunk_index': chunk_idx
                    }
                )

            # Index images with dual embeddings
            for img_idx, image in enumerate(page['images']):
                # Visual embedding for image-similarity search
                visual_embedding = self.clip_embedder.embed_images([image['data']])[0]

                # Generate description with VLM for text-based search
                description = self.vlm.describe_image(
                    image['data'],
                    context=page['text'][:500]  # Include surrounding text
                )
                semantic_embedding = self.text_embedder.encode(description)

                image_id = f"{doc_id}_p{page_num}_img_{img_idx}"

                # Store visual embedding (for "find similar images" queries)
                self.vector_store.upsert(
                    id=f"{image_id}_visual",
                    vector=visual_embedding,
                    metadata={
                        'doc_id': doc_id,
                        'page': page_num,
                        'type': 'image_visual',
                        'description': description,
                        'image_ref': image['path']
                    }
                )

                # Store semantic embedding (for "find images about X" queries)
                self.vector_store.upsert(
                    id=f"{image_id}_semantic",
                    vector=semantic_embedding,
                    metadata={
                        'doc_id': doc_id,
                        'page': page_num,
                        'type': 'image_semantic',
                        'description': description,
                        'image_ref': image['path']
                    }
                )

    def query(
        self,
        question: str,
        query_image: Optional = None,
        top_k: int = 5
    ) -> Dict:
        """
        Query across all modalities.

        Supports:
        - Text query → finds text and image descriptions
        - Image query → finds similar images
        - Combined text+image query
        """
        results = []

        # Text-based search
        if question:
            text_query_embedding = self.text_embedder.encode(question)
            clip_text_embedding = self.clip_embedder.embed_text([question])[0]

            # Search text chunks
            text_results = self.vector_store.search(
                vector=text_query_embedding,
                filter={'type': 'text'},
                top_k=top_k
            )
            results.extend(text_results)

            # Search image descriptions (semantic)
            semantic_results = self.vector_store.search(
                vector=text_query_embedding,
                filter={'type': 'image_semantic'},
                top_k=top_k
            )
            results.extend(semantic_results)

            # Search images by visual similarity to text query
            visual_results = self.vector_store.search(
                vector=clip_text_embedding,
                filter={'type': 'image_visual'},
                top_k=top_k
            )
            results.extend(visual_results)

        # Image-based search
        if query_image is not None:
            image_embedding = self.clip_embedder.embed_images([query_image])[0]

            image_results = self.vector_store.search(
                vector=image_embedding,
                filter={'type': 'image_visual'},
                top_k=top_k
            )
            results.extend(image_results)

        # Deduplicate and rank
        unique_results = self._deduplicate_results(results, top_k)

        # Build context for generation
        context = self._build_multimodal_context(unique_results)

        # Generate answer with VLM (can reference images)
        answer = self._generate_answer(question, context, unique_results)

        return {
            'answer': answer,
            'sources': unique_results,
            'source_count': len(unique_results)
        }

    def _deduplicate_results(
        self,
        results: List[Dict],
        top_k: int
    ) -> List[Dict]:
        """
        Deduplicate results by document+page+content.

        If both visual and semantic embeddings of same image appear,
        keep the higher-scoring one.
        """
        seen = {}

        for result in results:
            # Create key from doc, page, and type
            key = (
                result['metadata']['doc_id'],
                result['metadata']['page'],
                result['metadata'].get('image_ref', result['metadata'].get('content', ''))
            )

            if key not in seen or result['score'] > seen[key]['score']:
                seen[key] = result

        # Sort by score and return top_k
        unique = sorted(seen.values(), key=lambda x: x['score'], reverse=True)
        return unique[:top_k]

    def _build_multimodal_context(self, results: List[Dict]) -> str:
        """Format retrieved content as context."""
        context_parts = []

        for i, result in enumerate(results, 1):
            meta = result['metadata']
            if meta['type'] == 'text':
                context_parts.append(
                    f"[{i}] Text from page {meta['page']}: {meta['content']}"
                )
            elif 'image' in meta['type']:
                context_parts.append(
                    f"[{i}] Image from page {meta['page']}: {meta['description']}"
                )

        return "\n\n".join(context_parts)

    def _generate_answer(
        self,
        question: str,
        text_context: str,
        results: List[Dict]
    ) -> str:
        """Generate answer using VLM with both text and image context."""

        # Collect images from results
        images = [
            r['metadata']['image_ref']
            for r in results
            if 'image' in r['metadata']['type']
        ]

        # Build prompt with text context and images
        prompt = f"""Answer this question using the provided context and images:

Question: {question}

Text Context:
{text_context}

The images referenced in the context are provided below.
"""

        # Call VLM with text prompt + images
        answer = self.vlm.generate(
            prompt=prompt,
            images=images[:5]  # Limit images to avoid token limits
        )

        return answer
Common Mistake: Using Only Text Embeddings for Image Search

What people do: Generate text descriptions of images with a VLM, embed those descriptions with a text model, and call it “multimodal search.”

Why it fails: Text descriptions lose visual information. “A bar chart showing quarterly revenue” doesn’t capture the specific colors, layout, or that Q3 has a red declining bar. When a user searches “charts with downward trends,” the text embedding might miss visually obvious patterns.

Fix: Use dual embeddings—CLIP visual embeddings for visual similarity (“find charts that look like this”) AND text embeddings of descriptions for semantic search (“find charts about revenue”). Different query types need different embedding strategies.

Why Dual Embeddings for Images?

Indexing images with both CLIP (visual) and text (semantic) embeddings serves different query types:

  • Visual query: “Find images similar to this chart” → uses CLIP visual embedding
  • Semantic query: “Find images showing revenue growth” → uses text embedding of VLM-generated description

This dual approach maximizes recall across different query modalities.

Complete code: See Multimodal Document RAG and Multimodal Embeddings and Indexing for production implementations with dual embeddings and chunking strategies.

Late Fusion: When Separate Indexes Are Better

Sometimes searching modalities separately and combining results works better than a single unified index:

class LateFusionMultimodalSearch:
    """Search modalities separately, combine with learned weights."""

    def __init__(self, fusion_weights: Optional[Dict] = None):
        self.text_index = TextVectorIndex()
        self.image_index = CLIPVectorIndex()
        self.audio_index = AudioEmbeddingIndex()

        # Default weights (can be learned from data)
        self.weights = fusion_weights or {
            'text': 0.5,
            'image': 0.3,
            'audio': 0.2
        }

    def search(
        self,
        query: str,
        modalities: List[str] = ['text', 'image', 'audio'],
        top_k: int = 10
    ) -> List[Dict]:
        """
        Search each modality and combine results.

        Args:
            query: Text query
            modalities: Which modalities to search
            top_k: Number of final results

        Returns:
            Combined ranked results
        """
        all_results = []

        # Search each modality
        if 'text' in modalities:
            text_results = self.text_index.search(query, top_k=top_k * 2)
            text_results = self._normalize_scores(text_results)
            all_results.extend([
                {**r, 'modality': 'text', 'weight': self.weights['text']}
                for r in text_results
            ])

        if 'image' in modalities:
            image_results = self.image_index.search(query, top_k=top_k * 2)
            image_results = self._normalize_scores(image_results)
            all_results.extend([
                {**r, 'modality': 'image', 'weight': self.weights['image']}
                for r in image_results
            ])

        if 'audio' in modalities:
            audio_results = self.audio_index.search(query, top_k=top_k * 2)
            audio_results = self._normalize_scores(audio_results)
            all_results.extend([
                {**r, 'modality': 'audio', 'weight': self.weights['audio']}
                for r in audio_results
            ])

        # Combine scores by document
        doc_scores = {}
        doc_results = {}

        for result in all_results:
            doc_id = result['doc_id']
            weighted_score = result['score'] * result['weight']

            if doc_id not in doc_scores:
                doc_scores[doc_id] = 0
                doc_results[doc_id] = []

            doc_scores[doc_id] += weighted_score
            doc_results[doc_id].append(result)

        # Rank documents by combined score
        ranked_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]

        # Return results with all matching modalities
        final_results = []
        for doc_id, score in ranked_docs:
            final_results.append({
                'doc_id': doc_id,
                'combined_score': score,
                'matches': doc_results[doc_id]
            })

        return final_results

    def _normalize_scores(self, results: List[Dict]) -> List[Dict]:
        """Normalize scores to [0, 1] range."""
        if not results:
            return results

        scores = [r['score'] for r in results]
        min_score = min(scores)
        max_score = max(scores)
        score_range = max_score - min_score

        if score_range == 0:
            return results

        normalized = []
        for r in results:
            normalized.append({
                **r,
                'score': (r['score'] - min_score) / score_range
            })

        return normalized

    def learn_fusion_weights(
        self,
        queries: List[str],
        relevance_labels: List[Dict]
    ) -> Dict:
        """
        Learn optimal fusion weights from labeled data.

        Args:
            queries: List of query strings
            relevance_labels: List of {doc_id: relevance_score} dicts

        Returns:
            Optimal weight dictionary
        """
        best_weights = self.weights
        best_ndcg = 0

        # Grid search over weight combinations
        for text_weight in np.arange(0.2, 0.8, 0.1):
            for image_weight in np.arange(0.1, 0.6, 0.1):
                audio_weight = 1.0 - text_weight - image_weight

                if audio_weight < 0 or audio_weight > 1:
                    continue

                self.weights = {
                    'text': text_weight,
                    'image': image_weight,
                    'audio': audio_weight
                }

                # Evaluate on labeled data
                ndcg = self._evaluate_ndcg(queries, relevance_labels)

                if ndcg > best_ndcg:
                    best_ndcg = ndcg
                    best_weights = self.weights.copy()

        self.weights = best_weights
        return best_weights

    def _evaluate_ndcg(
        self,
        queries: List[str],
        relevance_labels: List[Dict]
    ) -> float:
        """Compute average NDCG across queries."""
        ndcgs = []

        for query, labels in zip(queries, relevance_labels):
            results = self.search(query, top_k=10)
            ndcg = self._compute_ndcg(results, labels)
            ndcgs.append(ndcg)

        return np.mean(ndcgs)

    def _compute_ndcg(self, results: List[Dict], labels: Dict) -> float:
        """Compute NDCG for single query."""
        # Implementation of NDCG calculation
        # (omitted for brevity - standard IR metric)
        pass

When to Use Late Fusion

Late fusion is beneficial when:

  1. Different retrieval characteristics: Text search uses semantic similarity, image search uses visual similarity—they measure different things
  2. Interpretability needed: You want to see which modality contributed to each result
  3. Variable query patterns: Some queries are more visual (“find the org chart”), others more textual (“find discussion of hiring plans”)
  4. You have labeled data: Fusion weights can be optimized for your specific use case
  5. Modality-specific tuning: You want to use different models/parameters for each modality

Early vs. Late Fusion Comparison

Aspect Early Fusion Late Fusion
Architecture Single unified embedding space Separate indexes per modality
Cross-modal reasoning Built-in (shared space) Requires explicit design
Interpretability Black box Clear modality contributions
Optimization End-to-end Per-modality + fusion weights
Flexibility Limited High (different models per modality)
Complexity Lower Higher (multiple systems)

Complete code: See Late Fusion Search for a complete implementation with learnable fusion weights and confidence scoring.


Production Architecture Patterns

Cost Management for Audio/Video Processing

Audio and video processing is expensive due to:

  • Large file sizes requiring significant bandwidth
  • Compute-intensive processing (transcription, frame analysis)
  • High token costs for video frame analysis (images are expensive tokens)

Cost-Aware Processing Strategy

class CostOptimizedVideoProcessor:
    """Process video with cost controls and optimization."""

    def __init__(self, monthly_budget: float):
        self.budget = monthly_budget
        self.spent_this_month = 0

        # Cost estimates (adjust based on your providers)
        self.costs = {
            'whisper_per_minute': 0.006,
            'video_frame_analysis': 0.01,  # Per frame with VLM
            'clip_embedding': 0.0001,       # Per image
            'storage_per_gb_month': 0.02
        }

    def estimate_video_cost(
        self,
        video_path: str,
        frames_to_sample: int = 20
    ) -> Dict:
        """Estimate processing cost before committing."""

        # Get video metadata
        video_info = self._get_video_info(video_path)
        duration_minutes = video_info['duration'] / 60

        costs = {}

        # Audio transcription cost
        costs['transcription'] = duration_minutes * self.costs['whisper_per_minute']

        # Video frame analysis cost
        costs['frame_analysis'] = frames_to_sample * self.costs['video_frame_analysis']

        # CLIP embeddings (if using)
        costs['embeddings'] = frames_to_sample * self.costs['clip_embedding']

        # Storage cost (monthly)
        file_size_gb = video_info['size_bytes'] / (1024**3)
        costs['storage_monthly'] = file_size_gb * self.costs['storage_per_gb_month']

        costs['total_processing'] = sum(
            v for k, v in costs.items() if k != 'storage_monthly'
        )

        return costs

    async def process_within_budget(
        self,
        video_path: str,
        priority: str = 'balanced'
    ) -> Dict:
        """
        Process video with automatic cost optimization.

        Args:
            video_path: Path to video file
            priority: 'speed', 'cost', or 'balanced'

        Returns:
            Processing results with actual cost
        """
        estimated_cost = self.estimate_video_cost(video_path)

        if self.spent_this_month + estimated_cost['total_processing'] > self.budget:
            # Try to reduce cost
            optimized_plan = self._optimize_processing_plan(
                video_path,
                available_budget=self.budget - self.spent_this_month,
                priority=priority
            )
        else:
            optimized_plan = self._standard_processing_plan(video_path)

        # Execute plan
        result = await self._execute_plan(video_path, optimized_plan)

        # Track spending
        self.spent_this_month += result['actual_cost']

        return result

    def _optimize_processing_plan(
        self,
        video_path: str,
        available_budget: float,
        priority: str
    ) -> Dict:
        """Create cost-optimized processing plan."""

        plan = {}

        if priority == 'cost':
            # Minimize cost
            plan['frame_count'] = 10  # Fewer frames
            plan['frame_resolution'] = 512  # Lower resolution
            plan['audio_only'] = False
            plan['use_scene_detection'] = True  # Smart sampling

        elif priority == 'speed':
            # Maximize speed (which often means lower cost too)
            plan['frame_count'] = 15
            plan['frame_resolution'] = 768
            plan['parallel_processing'] = True
            plan['use_scene_detection'] = True

        else:  # balanced
            plan['frame_count'] = 20
            plan['frame_resolution'] = 1024
            plan['audio_only'] = False
            plan['use_scene_detection'] = True

        # If still over budget, escalate reductions
        estimated = self._estimate_plan_cost(plan)
        if estimated > available_budget:
            plan['frame_count'] = max(5, plan['frame_count'] - 10)
            plan['audio_only'] = True  # Skip video analysis

        return plan

Error Handling for Audio Systems

Audio processing has unique failure modes that require specific error handling:

class RobustAudioProcessor:
    """Audio processing with comprehensive error handling."""

    def __init__(self):
        self.transcriber = TranscriptionService()
        self.max_retries = 3
        self.timeout_seconds = 300

    async def process_with_fallbacks(
        self,
        audio_path: str
    ) -> Dict:
        """Process audio with multiple fallback strategies."""

        errors = []

        # Try standard processing
        try:
            result = await self._process_standard(audio_path)
            return {'status': 'success', 'result': result, 'method': 'standard'}
        except FileTooLargeError as e:
            errors.append(f"File too large: {e}")
            # Fallback: compress audio
            try:
                compressed_path = self._compress_audio(audio_path)
                result = await self._process_standard(compressed_path)
                return {
                    'status': 'success',
                    'result': result,
                    'method': 'compressed',
                    'warning': 'Audio was compressed, may affect quality'
                }
            except Exception as e2:
                errors.append(f"Compression failed: {e2}")

        except AudioQualityError as e:
            errors.append(f"Low quality audio: {e}")
            # Fallback: enhance audio
            try:
                enhanced_path = self._enhance_audio(audio_path)
                result = await self._process_standard(enhanced_path)
                return {
                    'status': 'success',
                    'result': result,
                    'method': 'enhanced'
                }
            except Exception as e2:
                errors.append(f"Enhancement failed: {e2}")

        except TimeoutError as e:
            errors.append(f"Processing timeout: {e}")
            # Fallback: chunk and process
            try:
                result = await self._process_chunked(audio_path)
                return {
                    'status': 'success',
                    'result': result,
                    'method': 'chunked'
                }
            except Exception as e2:
                errors.append(f"Chunked processing failed: {e2}")

        # All methods failed
        return {
            'status': 'failed',
            'errors': errors,
            'suggestion': 'Manual review required'
        }

    def _compress_audio(self, audio_path: str) -> str:
        """Compress audio to reduce file size."""
        import subprocess

        output_path = audio_path.replace('.mp3', '_compressed.mp3')

        subprocess.run([
            'ffmpeg', '-i', audio_path,
            '-b:a', '64k',  # Lower bitrate
            '-ar', '16000',  # Lower sample rate
            output_path
        ], check=True)

        return output_path

    def _enhance_audio(self, audio_path: str) -> str:
        """Enhance audio quality (noise reduction, normalization)."""
        # Use audio enhancement tools (e.g., noisereduce, pydub)
        pass

Streaming Architecture for Real-Time Processing

Real-time audio/video processing requires careful architectural design:

┌─────────────────────────────────────────────────────────────────────────────┐
│                    STREAMING AUDIO/VIDEO ARCHITECTURE                       │
└─────────────────────────────────────────────────────────────────────────────┘

     Client (Browser/App)
              │
              │ WebSocket/WebRTC
              ▼
     ┌──────────────────┐
     │  Ingestion       │  Accept audio/video stream
     │  Service         │  Buffer management
     └──────────────────┘
              │
              ▼
     ┌──────────────────┐
     │  Message Queue   │  Kafka/RabbitMQ
     │  (Audio chunks)  │  Decouples ingestion from processing
     └──────────────────┘
              │
              ├────────────────────┬────────────────────┐
              ▼                    ▼                    ▼
     ┌──────────────┐      ┌──────────────┐    ┌──────────────┐
     │  Streaming   │      │  Diarization │    │  Embedding   │
     │  Whisper     │      │  Service     │    │  Service     │
     │  Worker      │      │  Worker      │    │  Worker      │
     └──────────────┘      └──────────────┘    └──────────────┘
              │                    │                    │
              └────────────────────┴────────────────────┘
                                   ▼
                          ┌──────────────────┐
                          │  Results         │
                          │  Aggregation     │
                          └──────────────────┘
                                   │
                                   ▼
                          ┌──────────────────┐
                          │  WebSocket       │  Push results to client
                          │  Server          │  Real-time updates
                          └──────────────────┘
                                   │
                                   ▼
                            Client Display

Key Design Principles:

  1. Buffering: Collect audio in small chunks (2-5 seconds) with overlap
  2. Async processing: Workers process chunks in parallel
  3. Message queue: Decouples ingestion from processing, handles backpressure
  4. Progressive results: Send partial results immediately, corrections later
  5. State management: Track session state across chunks
  6. Error recovery: Handle worker failures without losing chunks

Evaluation and Quality Assurance

Transcription Accuracy: Word Error Rate

The standard metric for speech recognition is Word Error Rate (WER):

WER = (Substitutions + Deletions + Insertions) / Total Words in Reference
from typing import List, Tuple
import difflib

def calculate_wer(reference: str, hypothesis: str) -> Dict:
    """
    Calculate Word Error Rate and related metrics.

    Args:
        reference: Ground truth transcription
        hypothesis: Model's transcription

    Returns:
        Dictionary with WER, insertions, deletions, substitutions
    """
    ref_words = reference.lower().split()
    hyp_words = hypothesis.lower().split()

    # Use difflib to compute edit operations
    matcher = difflib.SequenceMatcher(None, ref_words, hyp_words)

    substitutions = 0
    deletions = 0
    insertions = 0

    for tag, i1, i2, j1, j2 in matcher.get_opcodes():
        if tag == 'replace':
            substitutions += max(i2 - i1, j2 - j1)
        elif tag == 'delete':
            deletions += i2 - i1
        elif tag == 'insert':
            insertions += j2 - j1

    total_words = len(ref_words)
    wer = (substitutions + deletions + insertions) / total_words if total_words > 0 else 0

    return {
        'wer': wer,
        'substitutions': substitutions,
        'deletions': deletions,
        'insertions': insertions,
        'total_words': total_words,
        'accuracy': 1 - wer
    }

def evaluate_transcription_system(
    test_cases: List[Tuple[str, str]]
) -> Dict:
    """
    Evaluate transcription system on test set.

    Args:
        test_cases: List of (audio_path, reference_transcript) tuples

    Returns:
        Aggregate metrics across test set
    """
    transcriber = TranscriptionService()
    results = []

    for audio_path, reference in test_cases:
        hypothesis = transcriber.transcribe(audio_path)['text']
        wer_metrics = calculate_wer(reference, hypothesis)

        results.append({
            'audio': audio_path,
            **wer_metrics
        })

    # Aggregate metrics
    avg_wer = np.mean([r['wer'] for r in results])
    avg_accuracy = 1 - avg_wer

    return {
        'average_wer': avg_wer,
        'average_accuracy': avg_accuracy,
        'median_wer': np.median([r['wer'] for r in results]),
        'per_file_results': results
    }

Interpreting WER:

  • < 5%: Excellent, near-human performance
  • 5-10%: Good, usable for most applications
  • 10-20%: Acceptable for some uses, may need review
  • > 20%: Poor, likely unusable without significant post-processing

Speaker Diarization Evaluation

Speaker diarization is evaluated using Diarization Error Rate (DER):

def calculate_der(
    reference_segments: List[Tuple[float, float, str]],
    hypothesis_segments: List[Tuple[float, float, str]],
    collar: float = 0.25
) -> Dict:
    """
    Calculate Diarization Error Rate.

    DER = (False Alarm + Missed Speech + Speaker Error) / Total Speech Time

    Args:
        reference_segments: List of (start, end, speaker_id) tuples
        hypothesis_segments: List of (start, end, speaker_id) tuples
        collar: Forgiveness collar in seconds around boundaries

    Returns:
        DER metrics
    """
    # Implementation uses frame-level comparison
    # (Full implementation omitted for brevity - see pyannote.metrics)

    from pyannote.metrics.diarization import DiarizationErrorRate

    metric = DiarizationErrorRate(collar=collar)

    # Convert to pyannote format and compute
    # (details omitted)

    return {
        'der': metric.compute(),
        'false_alarm': metric.false_alarm,
        'missed_speech': metric.missed_speech,
        'speaker_confusion': metric.confusion
    }

Video Content Quality Metrics

Video understanding has no single metric like WER. Use task-specific evaluation:

class VideoQualityEvaluator:
    """Evaluate video understanding quality across multiple dimensions."""

    def evaluate_video_summary(
        self,
        video_path: str,
        generated_summary: str,
        reference_summary: str
    ) -> Dict:
        """Evaluate summary quality."""

        # ROUGE scores for summary overlap
        rouge_scores = self._compute_rouge(generated_summary, reference_summary)

        # Semantic similarity
        semantic_sim = self._semantic_similarity(generated_summary, reference_summary)

        # Factual consistency (check for hallucinations)
        frames = extract_keyframes(video_path)
        consistency_score = self._check_factual_consistency(
            generated_summary,
            frames
        )

        return {
            'rouge_1': rouge_scores['rouge1'],
            'rouge_l': rouge_scores['rougeL'],
            'semantic_similarity': semantic_sim,
            'factual_consistency': consistency_score,
            'overall_quality': np.mean([
                rouge_scores['rougeL'],
                semantic_sim,
                consistency_score
            ])
        }

    def evaluate_temporal_understanding(
        self,
        video_path: str,
        questions: List[str],
        answers: List[str],
        ground_truth: List[str]
    ) -> Dict:
        """Evaluate temporal reasoning (before/after, cause/effect)."""

        correct = 0
        for answer, truth in zip(answers, ground_truth):
            if self._answers_match(answer, truth):
                correct += 1

        return {
            'temporal_accuracy': correct / len(questions),
            'total_questions': len(questions),
            'correct': correct
        }

Key Takeaways

  1. Audio processing is more than transcription: Modern systems combine transcription, speaker diarization, and temporal understanding. Vocabulary prompting dramatically improves domain-specific accuracy.

  2. Streaming vs. batch is a fundamental tradeoff: Real-time applications sacrifice accuracy for latency. Hybrid approaches (streaming + correction) provide the best user experience.

  3. Video understanding requires intelligent sampling: Not every frame needs analysis. Scene detection, motion-based sampling, and adaptive strategies balance cost against coverage.

  4. Multimodal retrieval needs aligned embeddings: CLIP enables cross-modal search. For production systems, dual embeddings (visual + semantic) maximize recall across query types.

  5. Late fusion provides flexibility: Separate indexes per modality enable modality-specific optimization and interpretable results. Fusion weights can be learned from data.

  6. Cost management is critical: Audio and especially video processing are expensive. Budget-aware processing, intelligent sampling, and fallback strategies prevent cost overruns.

Complete Reference: For full implementations of all components discussed in this chapter, see Multimodal Systems - Code Reference. This includes vision-language integration, audio transcription, speaker diarization, video processing, CLIP embeddings, and multimodal RAG patterns.


Summary

Multimodal AI is where engineering judgment matters most, because the design space explodes. Text RAG forces a few hard choices (chunk size, retriever, reranker). Video and audio retrieval force dozens — sampling rate, modality fusion, segmentation strategy, embedding pair, cost ceiling — and most of those choices have order-of-magnitude consequences for both quality and bill.

Three threads run through the whole chapter:

Time is a first-class input, not a free dimension. Audio and video carry meaning across seconds. Whisper-style spectrogram processing, scene-based frame sampling, and timestamp-aware indexing exist because pretending time doesn’t matter is the fastest way to ship a system that returns plausible but wrong answers. When in doubt, sample more sparsely but keep timestamps — a sparse-but-aligned index beats a dense-but-untimed one for every retrieval task we care about.

Cross-modal alignment is learned, not assumed. CLIP works because it was contrastively trained to put matching image-text pairs near each other. The corollary: image embeddings from a vision-only model and text embeddings from a language-only model live in different spaces and cannot be compared with cosine similarity, no matter how convenient that would be. Either use a CLIP-family model end-to-end, or fuse late (search each modality separately, then combine).

Cost is the design constraint that bites in production. A video question-answering system that uniformly samples 1 frame per second at 4K resolution will bankrupt you. Adaptive sampling, resolution tiering, audio-first transcription with selective frame inspection, and explicit per-request budgets are the difference between a demo and a service. Build the cost guardrails before you tune for quality, not after.

Audio, video, and cross-modal retrieval aren’t just “more modalities” — they’re a forcing function for everything Part III taught you: deployment thinking (Ch09), cost engineering (Ch32), evaluation discipline (Ch15), and reliability under spiky workloads (Ch31). If you can ship multimodal RAG to production, you can ship anything.


Practical Exercises

  1. Audio Transcription Pipeline: Build a production audio processing system that handles long files (>1 hour), includes speaker diarization, and supports domain-specific vocabulary. Measure WER on a test set and optimize for cost.

  2. Video Analysis System: Implement a video understanding system with multiple sampling strategies. Compare uniform vs. scene-based vs. adaptive sampling on 10 diverse videos. Measure quality (summary accuracy) vs. cost tradeoffs.

  3. Multimodal RAG: Build a RAG system for a corpus containing text documents, images, and videos. Index with both CLIP and text embeddings. Implement queries that search across all modalities and evaluate retrieval quality.

  4. Meeting Transcription Service: Create an end-to-end meeting transcription service with real-time display, speaker identification, and post-meeting summary/action item extraction. Deploy with WebSocket streaming.

  5. Podcast Search Engine: Build a searchable podcast database. Transcribe episodes, create embeddings of segments, enable queries like “episodes discussing remote work challenges” that return timestamped clips. Measure search quality with user studies.

  6. Cost Optimization Challenge: Given a fixed monthly budget, process the maximum number of videos possible while maintaining minimum quality thresholds. Implement adaptive sampling, resolution adjustment, and priority queuing.


Self-Assessment Checkpoint

Conceptual Questions

Q1. [IC2] Explain why mel spectrograms are used for speech recognition instead of raw waveforms. What properties of human hearing do they capture?

Answer Raw waveforms have extremely high dimensionality (16,000 samples/second) and don’t align with how humans perceive sound. Mel spectrograms: (1) Apply Fourier transform to convert time-domain signal to frequency domain, revealing what frequencies are present at each moment. (2) Use mel-scale filterbanks that mimic human hearing—we’re more sensitive to differences in low frequencies (100Hz vs 200Hz is perceptually large) than high frequencies (10kHz vs 10.1kHz is barely noticeable). (3) Compress frequency information logarithmically, allocating more resolution to perceptually-important ranges. (4) Create a 2D representation (time × frequency) that transformers can process as sequences. Result: 80-dimensional vectors per 10ms frame vs. 160 raw samples per 10ms—a 2x compression with perceptually-relevant features.

Q2. [IC2] What is speaker diarization and why is it challenging? How does it differ from speaker identification?

Answer Speaker diarization answers “when did different speakers speak?” without knowing who they are. It segments audio by speaker and labels segments (SPEAKER_00, SPEAKER_01, etc.). Challenges: (1) Unknown number of speakers in advance. (2) Similar voices (same gender, accent) are hard to distinguish. (3) Speaker overlap (simultaneous speech) creates ambiguity. (4) Short utterances lack sufficient voice characteristics. Speaker identification (a different task) answers “who is speaking?” by matching voices to known identities—requires a database of voice samples. Diarization is unsupervised clustering; identification is supervised classification. Production systems often combine both: diarize first, then match clusters to known users.

Q3. [Senior] Compare different video frame sampling strategies (uniform, scene-based, motion-based). When would you choose each, and what are the tradeoffs?

Answer Uniform sampling: Fixed intervals (every Nth frame or fixed FPS). Pros—simple, predictable cost, works for continuous content. Cons—wastes frames on static scenes, may miss brief important events. Use for sports, tutorials, unknown content. Scene-based: One frame per scene change (detected via visual similarity). Pros—captures each distinct scene once, efficient for structured content. Cons—requires scene detection (adds complexity), may oversample static scenes, undersample dynamic scenes. Use for presentations, movies, edited content. Motion-based: More frames during high-motion periods. Pros—focuses on activity, efficient for surveillance. Cons—may miss important static content (e.g., a sign), requires motion computation. Use for security footage, action videos. Adaptive (hybrid): Combines strategies (e.g., 60% uniform + 40% scene-based). Pros—robust to diverse content types. Cons—more complex. Use when video type is unknown or varies. Key tradeoff: coverage (sample enough to capture content) vs. cost (each frame analyzed costs money/time).

Q4. [Senior] How do you build a multimodal RAG system that searches across text documents, images, and audio? What indexing strategy would you use?

Answer Strategy: Index each modality with appropriate embeddings. (1) Text: Standard text embeddings (e.g., text-embedding-3-large) for semantic search. (2) Images: Dual embeddings—CLIP visual embeddings for image similarity + text embeddings of VLM-generated descriptions for text search. (3) Audio: Transcribe to text, embed transcriptions, also store timestamps for retrieval. (4) Cross-modal links: Maintain relationships (e.g., image on page 5 of document X, audio clip at 3:45 in video Y). Query processing: (1) Text query → embed with text model, search text chunks. (2) Text query → embed with CLIP text encoder, search image visual embeddings. (3) Text query → search audio transcripts. (4) Image query → embed with CLIP image encoder, search image database. Combine results with late fusion (weighted combination per modality) or early fusion (single unified index). Late fusion preferred for interpretability and modality-specific optimization. Return context with references to original modalities (text snippets, image paths with descriptions, audio clips with timestamps). Generate answer with VLM that can reference retrieved images.

Q5. [Staff] Design a real-time video streaming transcription system for live events. What are the key architectural components and how do you handle latency vs. accuracy tradeoffs?

Answer Architecture: (1) Ingestion layer: WebSocket/WebRTC to receive live video stream. (2) Audio extraction: Separate audio track in real-time. (3) Buffering: Collect 2-5 second audio chunks with 0.5-1s overlap. (4) Message queue (Kafka/Redis): Decouple ingestion from processing, handle backpressure. (5) Streaming transcription workers: Process chunks with streaming Whisper (optimized for low latency). (6) Batch correction workers: Every 30s, re-process buffered audio with full Whisper for accuracy. (7) Aggregation service: Merge streaming results, apply corrections, manage speaker turns. (8) WebSocket push: Send results to clients with type tags (streaming vs corrected). Latency vs accuracy: Streaming gives 200-500ms latency but 92-95% accuracy (limited context, no look-ahead). Batch correction gives 95-98% accuracy but 2-5s latency. Solution: Display streaming results immediately for engagement, update with corrections when ready. UI shows provisional text (gray) that becomes finalized (black) after correction. State management: Track session context across chunks (vocabulary, speaker embeddings). Error handling: Worker failures shouldn’t lose chunks (queue persistence), degraded mode if correction fails (streaming only). Scaling: Horizontal scaling of workers, load balancing across processing nodes. Cost: Streaming is more expensive than batch (maintained connections, more processing) but required for real-time UX.

Spot the Problem

Problem 1. [IC2] Audio chunking code:

def transcribe_long_audio(audio_path: str) -> str:
    chunks = split_audio(audio_path, chunk_size=30000)  # 30 seconds
    transcripts = [transcribe(chunk) for chunk in chunks]
    return ' '.join(transcripts)
Answer Problems: (1) No overlap between chunks—words cut in half at boundaries will be lost or garbled. (2) Concatenating with spaces loses sentence boundaries—last word of chunk 1 and first word of chunk 2 run together. (3) No timestamp alignment—can’t know when each word was spoken. (4) No deduplication—if using overlap, words in overlap region appear twice. (5) Speaker turns may be split—middle of sentence becomes two separate segments. Better: Use overlapping chunks (e.g., 30s chunks, 5s overlap), merge at word level based on timestamps, detect and remove duplicates in overlap regions, preserve speaker context across chunks.

Problem 2. [Senior] Video sampling:

def sample_video(video_path: str) -> list:
    frames = extract_all_frames(video_path)
    return frames[::300]  # Every 300th frame (~10 seconds at 30fps)
Answer Issues: (1) Fixed interval ignores content—10 seconds of static scene gets same sampling as 10 seconds of action. (2) No consideration of video characteristics—a 2-minute video and 2-hour video both sampled at same rate. (3) May miss scene transitions that fall between samples. (4) Extracts all frames first (memory intensive, slow)—should sample during extraction. (5) No frame quality check—might sample blurry frames during camera motion. Better: Use scene detection to sample key frames, implement adaptive sampling based on motion, cap total frames based on budget/requirements (e.g., max 20 frames regardless of duration), sample directly without extracting all frames, consider video duration (longer videos need sparser sampling).

Problem 3. [Staff] Multimodal search implementation:

def search(query: str) -> list:
    text_results = text_index.search(query)
    image_results = image_index.search(query)
    return text_results + image_results  # Concatenate
Answer Critical issues: (1) No score normalization—text and image similarity scores on different scales (e.g., 0-1 vs 0-100). Combining raw scores is meaningless. (2) No weighting—treats text and image results equally even though they have different retrieval characteristics and relevance distributions. (3) No deduplication—same document might appear in both results (text match + image match). (4) Simple concatenation doesn’t rank across modalities—all text results before any image results. (5) No consideration of query type—visual queries (“find the org chart”) should weight image results higher. (6) No top-k limit—might return hundreds of results. Better: Normalize scores to common scale [0, 1], weight by modality (learned or heuristic), combine scores by document ID, re-rank combined results, apply top-k, consider query classification to adjust weights dynamically.

Design Exercises

Exercise 1. [Senior] Design a searchable video lecture platform for a university. Students should be able to search across thousands of recorded lectures to find specific topics, see relevant slides, and jump to exact timestamps. Consider indexing strategy, query processing, and UI for results.

Guidance Indexing pipeline: (1) Extract audio, transcribe with Whisper, get word-level timestamps. (2) Use scene detection to identify slide transitions (lectures typically show slides). (3) OCR each unique slide for text content. (4) Sample 1-2 frames per slide, generate VLM descriptions. (5) Create embeddings: (a) Transcript segments (30-60s chunks) with text embeddings, (b) Slide OCR text with text embeddings, (c) Slide images with CLIP embeddings. (6) Store in vector DB with metadata (lecture ID, timestamp, slide number, course). Query processing: (1) Text query → embed and search transcript segments + slide text. (2) Rank by relevance + recency + popularity. (3) Group results by lecture, deduplicate. (4) For each result, retrieve: transcript context, slide image, timestamp. UI: (1) Search bar with autocomplete from indexed topics. (2) Results show: lecture title, relevant slide thumbnail, transcript snippet, timestamp. (3) Click result → video player jumps to timestamp, shows slide, highlights transcript. (4) Sidebar shows all slides from lecture for navigation. (5) “Similar lectures” based on embedding similarity. Advanced features: (1) Temporal search: “second half of the semester only”. (2) Visual search: upload diagram, find similar slides. (3) Concept clustering: “show all lectures covering dynamic programming”. (4) Personalization: track what student has watched, recommend gaps.

Exercise 2. [Staff] Design a content moderation system for a video streaming platform that detects prohibited content (violence, hate speech, explicit material) in real-time. Must process live streams with <2 second latency. Consider accuracy requirements, cost constraints, and different types of violations.

Guidance Multi-tier detection: Tier 1 (fast, high recall): (1) Audio: VAD (voice activity detection) + streaming transcription + keyword matching for hate speech. (2) Video: Sample 2-3 fps, run lightweight classifier (e.g., fine-tuned ResNet) for NSFW/violence. (3) Latency: 200-500ms. (4) Action: Flag for Tier 2 if suspicious. Tier 2 (slower, high precision): (1) VLM analysis of flagged segments with detailed prompts. (2) Audio segment sent to hate speech classifier. (3) Latency: 1-2s. (4) Action: Auto-block if high confidence, human review if medium confidence. Tier 3 (human review): (1) Queue of medium-confidence flags. (2) Moderators view context (before/after clip). (3) Decision: allow, block, warn creator. Architecture: (1) Stream ingestion → buffer. (2) Parallel processing lanes for audio and video. (3) Message queue to handle spike loads. (4) Priority processing for high-view-count streams. (5) Appeal system for false positives. Challenges: (1) Context matters—violence in news vs gratuitous violence. (2) Cultural differences—what’s acceptable varies by region. (3) Evolving violations—new hate symbols, coded language. (4) Adversarial evasion—bad actors deliberately evade detection. Cost: Tier 1 cheap (simple models), Tier 2 expensive (VLM per flagged segment), Tier 3 most expensive (human labor). Optimize by minimizing false positives from Tier 1. Metrics: Precision (% of blocks that are true violations), Recall (% of violations caught), False positive rate (wrongly blocked content—hurts creators), Latency (time to block).

Further Reading

Essential

  • Radford et al. (2021), “CLIP” - Contrastive multimodal learning foundation.
  • Radford et al. (2022), “Whisper” - The paper that transformed speech-to-text.
  • Girdhar et al. (2023), “ImageBind” - Unified embeddings across six modalities.

Deep Dives

  • Arnab et al. (2021), “ViViT” - Applying transformers to video understanding.
  • Bredin et al. (2020), “pyannote.audio” - Modern neural speaker diarization.
  • Kim et al. (2024), “Multimodal RAG for Long-Form Video QA” - RAG for video content.

Connections to Other Chapters

  • Chapter 7 (RAG Systems): Vector search and retrieval strategies extend to multimodal content
  • Chapter 7 (Advanced RAG): Chunking strategies apply to temporal segmentation of audio/video
  • Chapter 17 (Vision & Document AI): Vision foundations and VLM architectures underlie video understanding
  • Chapter 18 (Audio & Speech Systems): Audio processing and transcription for video soundtracks
  • Chapter 15 (MLOps & Evaluation): Metrics and evaluation frameworks for production ML systems