Chapter 12: Cloud AI Providers

Keywords

AWS Bedrock, Azure OpenAI, Google Vertex AI, cloud deployment, managed services, model hosting, API providers

Introduction

Deploying AI applications to production requires more than just getting a model to work locally. Cloud providers offer managed AI services that handle infrastructure, scaling, security, and compliance - but they come with tradeoffs in cost, flexibility, and vendor lock-in.

This chapter covers the three major cloud AI platforms: AWS Bedrock, Azure OpenAI Service, and Google Cloud Vertex AI. For multi-cloud strategies, cloud-native deployment patterns, and security best practices, see Chapter 13.

Why Cloud Deployment Matters:

  1. Managed Infrastructure: No GPU procurement, driver updates, or hardware maintenance
  2. Enterprise Features: Built-in security, compliance certifications, and audit trails
  3. Global Scale: Deploy models closer to users worldwide
  4. Pay-per-use: Avoid upfront hardware costs, scale to zero when idle
  5. Integration: Native connections to data warehouses, authentication systems, and monitoring

Managed vs Self-Hosted Tradeoffs:

Aspect Managed (Bedrock, Azure OpenAI, Vertex) Self-Hosted (EC2 + vLLM, etc.)
Setup Time Minutes Days to weeks
Cost at Low Volume Lower (pay-per-token) Higher (idle GPU costs)
Cost at High Volume Higher (markup on tokens) Lower (own hardware)
Control Limited model access Full control
Compliance Built-in certifications You manage everything
Latency Network overhead Can optimize aggressively
Customization Limited Unlimited

Interview Insight: Understand the breakeven point. For many startups, managed services are cheaper until ~10M+ tokens/day. Enterprise customers often choose managed for compliance, even if more expensive.

Common Mistake: Choosing a Cloud Provider Based Only on Model Access

What people do: Pick AWS Bedrock because they want Claude, or Azure because they want GPT-4, without considering their existing infrastructure, team expertise, or integration requirements.

Why it fails: The cost of context-switching between cloud ecosystems dwarfs the difference between models. If your data is in BigQuery, your team knows GCP, and your auth is in Firebase, adding AWS Bedrock for Claude creates operational overhead: new IAM policies, new billing, new monitoring, new incident response. You’ll spend more time managing infrastructure than benefiting from the model.

Fix: Start with your existing cloud provider unless you have a compelling reason. Most AI workloads can use any frontier model effectively. The best model is the one you can actually operate reliably. If you must use a different provider for a specific model, isolate it behind an abstraction layer.

Staff Engineer Perspective

“We were spending around $180K/month on OpenAI direct and the contract was up for renewal. I built a working PoC on Bedrock Claude in three days and shared the latency and quality numbers with our account team. Two weeks later we had a 22 percent discount and committed-use credits we didn’t even ask for. The PoC was the leverage—not a threat, just a credible alternative. Always keep a second provider warm enough that you could switch in a quarter. You’re not going to switch. But the option is what gets your bill negotiated.”

Engineering Manager at a mid-stage AI startup


AWS Bedrock

AWS Bedrock is Amazon’s managed service for foundation models. It provides access to models from Anthropic, Meta, Cohere, AI21 Labs, and Amazon’s own Titan models through a unified API.

Model Access and Configuration

Available Models: - Anthropic Claude Sonnet 4.6/Opus: Best for complex reasoning, coding, analysis - Meta Llama 4: Open-source, good for fine-tuning - Cohere Command: Strong for RAG and search - Amazon Titan: Cost-effective for embeddings and basic tasks - Stability AI: Image generation

Model Access Process:

  1. Request model access in AWS Console (some models require approval)
  2. Choose region (us-east-1, us-west-2 have most models)
  3. Use boto3 to invoke models

Basic Setup:

# requirements.txt
boto3>=1.34.0
anthropic>=0.18.0

# bedrock_client.py
import boto3
import json
from typing import Dict, List, Optional

class BedrockClient:
    """Production-ready Bedrock client with error handling and retries."""

    def __init__(self, region: str = "us-east-1"):
        self.region = region
        self.bedrock_runtime = boto3.client(
            service_name='bedrock-runtime',
            region_name=region
        )

    def invoke_claude(
        self,
        messages: List[Dict[str, str]],
        model_id: str = "anthropic.claude-sonnet-4-6-v1:0",
        max_tokens: int = 4096,
        temperature: float = 1.0,
        system: Optional[str] = None
    ) -> Dict:
        """
        Invoke Claude via Bedrock.

        Args:
            messages: List of {"role": "user/assistant", "content": "..."}
            model_id: Bedrock model identifier
            max_tokens: Maximum tokens to generate
            temperature: Sampling temperature
            system: Optional system prompt

        Returns:
            Response dict with 'content' and 'usage'
        """
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": messages,
            "temperature": temperature
        }

        if system:
            body["system"] = system

        try:
            response = self.bedrock_runtime.invoke_model(
                modelId=model_id,
                body=json.dumps(body)
            )

            response_body = json.loads(response['body'].read())

            return {
                "content": response_body['content'][0]['text'],
                "usage": response_body['usage'],
                "model": model_id
            }

        except Exception as e:
            print(f"Bedrock invocation error: {e}")
            raise

    def invoke_streaming(
        self,
        messages: List[Dict[str, str]],
        model_id: str = "anthropic.claude-sonnet-4-6-v1:0",
        max_tokens: int = 4096
    ):
        """Stream response from Claude."""
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": messages
        }

        response = self.bedrock_runtime.invoke_model_with_response_stream(
            modelId=model_id,
            body=json.dumps(body)
        )

        stream = response['body']

        for event in stream:
            chunk = json.loads(event['chunk']['bytes'])

            if chunk['type'] == 'content_block_delta':
                yield chunk['delta']['text']
            elif chunk['type'] == 'message_stop':
                break

# Usage
client = BedrockClient()

response = client.invoke_claude(
    messages=[
        {"role": "user", "content": "Explain async/await in Python"}
    ],
    system="You are a Python expert. Be concise."
)

print(response['content'])
print(f"Tokens: {response['usage']}")

Bedrock Agents and Knowledge Bases

Bedrock Agents are autonomous agents that can use tools and access knowledge bases. Think of them as managed LangChain agents.

Creating a Knowledge Base:

# bedrock_kb.py
import boto3
import time
from typing import List, Dict

class BedrockKnowledgeBase:
    """Manage Bedrock Knowledge Bases for RAG."""

    def __init__(self, region: str = "us-east-1"):
        self.bedrock_agent = boto3.client('bedrock-agent', region_name=region)
        self.bedrock_agent_runtime = boto3.client(
            'bedrock-agent-runtime',
            region_name=region
        )
        self.s3 = boto3.client('s3', region_name=region)

    def create_knowledge_base(
        self,
        name: str,
        s3_bucket: str,
        s3_prefix: str,
        embedding_model: str = "amazon.titan-embed-text-v1",
        description: str = ""
    ) -> str:
        """
        Create a knowledge base backed by S3.

        Args:
            name: KB name
            s3_bucket: S3 bucket with documents
            s3_prefix: Prefix within bucket
            embedding_model: Bedrock embedding model ID

        Returns:
            Knowledge base ID
        """

        # 1. Create OpenSearch Serverless collection (required for KB)
        # Note: In production, create this separately via CloudFormation

        # 2. Create the knowledge base
        response = self.bedrock_agent.create_knowledge_base(
            name=name,
            description=description,
            roleArn=self._get_kb_role_arn(),  # IAM role for KB
            knowledgeBaseConfiguration={
                'type': 'VECTOR',
                'vectorKnowledgeBaseConfiguration': {
                    'embeddingModelArn': f'arn:aws:bedrock:{self.bedrock_agent.meta.region_name}::foundation-model/{embedding_model}'
                }
            },
            storageConfiguration={
                'type': 'OPENSEARCH_SERVERLESS',
                'opensearchServerlessConfiguration': {
                    'collectionArn': self._get_collection_arn(),
                    'vectorIndexName': 'bedrock-knowledge-base-index',
                    'fieldMapping': {
                        'vectorField': 'bedrock-knowledge-base-vector',
                        'textField': 'AMAZON_BEDROCK_TEXT_CHUNK',
                        'metadataField': 'AMAZON_BEDROCK_METADATA'
                    }
                }
            }
        )

        kb_id = response['knowledgeBase']['knowledgeBaseId']

        # 3. Create data source
        self.bedrock_agent.create_data_source(
            knowledgeBaseId=kb_id,
            name=f"{name}-s3-source",
            dataSourceConfiguration={
                'type': 'S3',
                's3Configuration': {
                    'bucketArn': f'arn:aws:s3:::{s3_bucket}',
                    'inclusionPrefixes': [s3_prefix]
                }
            }
        )

        return kb_id

    def sync_knowledge_base(self, kb_id: str, data_source_id: str):
        """Trigger ingestion job to sync S3 data."""
        response = self.bedrock_agent.start_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=data_source_id
        )

        job_id = response['ingestionJob']['ingestionJobId']

        # Wait for completion
        while True:
            status = self.bedrock_agent.get_ingestion_job(
                knowledgeBaseId=kb_id,
                dataSourceId=data_source_id,
                ingestionJobId=job_id
            )

            state = status['ingestionJob']['status']

            if state == 'COMPLETE':
                print("Ingestion complete")
                break
            elif state == 'FAILED':
                raise Exception("Ingestion failed")

            time.sleep(10)

    def query_knowledge_base(
        self,
        kb_id: str,
        query: str,
        num_results: int = 5
    ) -> List[Dict]:
        """
        Query knowledge base and get relevant chunks.

        Returns:
            List of {"content": str, "score": float, "metadata": dict}
        """
        response = self.bedrock_agent_runtime.retrieve(
            knowledgeBaseId=kb_id,
            retrievalQuery={'text': query},
            retrievalConfiguration={
                'vectorSearchConfiguration': {
                    'numberOfResults': num_results
                }
            }
        )

        results = []
        for item in response['retrievalResults']:
            results.append({
                'content': item['content']['text'],
                'score': item['score'],
                'metadata': item.get('metadata', {})
            })

        return results

    def retrieve_and_generate(
        self,
        kb_id: str,
        query: str,
        model_id: str = "anthropic.claude-sonnet-4-6-v1:0"
    ) -> str:
        """
        RAG: Retrieve from KB and generate answer.

        This is the high-level API that does retrieval + generation.
        """
        response = self.bedrock_agent_runtime.retrieve_and_generate(
            input={'text': query},
            retrieveAndGenerateConfiguration={
                'type': 'KNOWLEDGE_BASE',
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': kb_id,
                    'modelArn': f'arn:aws:bedrock:{self.bedrock_agent.meta.region_name}::foundation-model/{model_id}'
                }
            }
        )

        return response['output']['text']

    def _get_kb_role_arn(self) -> str:
        """Get IAM role ARN for KB. Create via CloudFormation in production."""
        # Placeholder - in production, create proper IAM role
        return "arn:aws:iam::123456789012:role/BedrockKBRole"

    def _get_collection_arn(self) -> str:
        """Get OpenSearch Serverless collection ARN."""
        # Placeholder - create collection first
        return "arn:aws:aoss:us-east-1:123456789012:collection/bedrock-kb"

# Usage
kb = BedrockKnowledgeBase()

# Upload docs to S3 first
kb.s3.upload_file('docs/manual.pdf', 'my-bucket', 'docs/manual.pdf')

# Create KB (one-time setup)
kb_id = kb.create_knowledge_base(
    name="product-docs",
    s3_bucket="my-bucket",
    s3_prefix="docs/"
)

# Query with automatic RAG
answer = kb.retrieve_and_generate(
    kb_id=kb_id,
    query="How do I reset my password?"
)

print(answer)

Integration with AWS Services

Lambda Integration:

# lambda_function.py
import json
import boto3
import os

bedrock_runtime = boto3.client('bedrock-runtime')

def lambda_handler(event, context):
    """
    AWS Lambda function for serverless AI inference.

    Triggered by API Gateway, EventBridge, or direct invoke.
    """

    # Parse input
    body = json.loads(event.get('body', '{}'))
    user_message = body.get('message', '')

    # Invoke Bedrock
    response = bedrock_runtime.invoke_model(
        modelId="anthropic.claude-sonnet-4-6-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "messages": [
                {"role": "user", "content": user_message}
            ]
        })
    )

    result = json.loads(response['body'].read())

    return {
        'statusCode': 200,
        'body': json.dumps({
            'response': result['content'][0]['text'],
            'usage': result['usage']
        })
    }

SageMaker Integration for Fine-tuning:

# bedrock_sagemaker.py
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFace

class BedrockSageMakerPipeline:
    """Combine Bedrock for inference, SageMaker for fine-tuning."""

    def __init__(self):
        self.sagemaker_session = sagemaker.Session()
        self.role = sagemaker.get_execution_role()
        self.bedrock = boto3.client('bedrock-runtime')

    def fine_tune_llama(
        self,
        training_data_s3: str,
        output_s3: str,
        base_model: str = "meta-llama/Llama-4-Scout-17B"
    ):
        """
        Fine-tune Llama on SageMaker, then use via Bedrock.
        """

        huggingface_estimator = HuggingFace(
            entry_point='train.py',
            source_dir='./scripts',
            instance_type='ml.g5.2xlarge',
            instance_count=1,
            role=self.role,
            transformers_version='4.36',
            pytorch_version='2.1',
            py_version='py310',
            hyperparameters={
                'model_name': base_model,
                'epochs': 3,
                'per_device_train_batch_size': 4,
                'learning_rate': 2e-5
            }
        )

        huggingface_estimator.fit({'training': training_data_s3})

        # Deploy to SageMaker endpoint
        predictor = huggingface_estimator.deploy(
            initial_instance_count=1,
            instance_type='ml.g5.xlarge'
        )

        return predictor

    def batch_inference_bedrock(
        self,
        input_s3: str,
        output_s3: str,
        model_id: str
    ):
        """
        Bedrock batch inference for large-scale jobs.
        More cost-effective than real-time for bulk processing.
        """
        bedrock = boto3.client('bedrock')

        job = bedrock.create_model_invocation_job(
            jobName='batch-inference-job',
            modelId=model_id,
            inputDataConfig={
                's3InputDataConfig': {
                    's3Uri': input_s3
                }
            },
            outputDataConfig={
                's3OutputDataConfig': {
                    's3Uri': output_s3
                }
            },
            roleArn=self.role
        )

        return job['jobArn']

S3 Event-Driven Processing:

# s3_document_processor.py
import boto3
import json
from typing import Dict

class S3DocumentProcessor:
    """Process documents uploaded to S3 using Bedrock."""

    def __init__(self):
        self.s3 = boto3.client('s3')
        self.bedrock = boto3.client('bedrock-runtime')

    def process_document(self, bucket: str, key: str) -> Dict:
        """
        Extract insights from document using Claude.

        Triggered by S3 event -> Lambda -> this function.
        """

        # Download document
        obj = self.s3.get_object(Bucket=bucket, Key=key)
        content = obj['Body'].read().decode('utf-8')

        # Analyze with Claude
        response = self.bedrock.invoke_model(
            modelId="anthropic.claude-sonnet-4-6-v1:0",
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 4096,
                "messages": [{
                    "role": "user",
                    "content": f"""Analyze this document and extract:
1. Key topics
2. Action items
3. Sentiment (positive/negative/neutral)
4. Summary (2-3 sentences)

Document:
{content}

Return as JSON."""
                }]
            })
        )

        result = json.loads(response['body'].read())
        analysis = result['content'][0]['text']

        # Save analysis back to S3
        analysis_key = key.replace('.txt', '_analysis.json')
        self.s3.put_object(
            Bucket=bucket,
            Key=analysis_key,
            Body=analysis,
            ContentType='application/json'
        )

        return json.loads(analysis)

Cost Optimization on AWS

Bedrock Pricing (as of 2026):

Model Input (per 1K tokens) Output (per 1K tokens)
Claude Sonnet 4.6 $0.003 $0.015
Claude Opus 4.8 $0.005 $0.025
Claude Haiku 4.5 $0.001 $0.005
Llama 4 Scout $0.00099 $0.00099
Titan Text Express $0.0002 $0.0006

Cost Optimization Strategies:

# cost_optimizer.py
from typing import List, Dict
import tiktoken

class BedrockCostOptimizer:
    """Optimize Bedrock costs through smart model selection and caching."""

    PRICING = {
        'claude-sonnet-4-6': {'input': 0.003, 'output': 0.015},
        'claude-haiku-4-5': {'input': 0.001, 'output': 0.005},
        'claude-opus-4-8': {'input': 0.005, 'output': 0.025},
    }

    def __init__(self):
        self.encoder = tiktoken.get_encoding("cl100k_base")

    def estimate_cost(
        self,
        messages: List[Dict],
        model: str,
        expected_output_tokens: int
    ) -> float:
        """Estimate cost before making request."""

        # Count input tokens
        input_text = ' '.join([m['content'] for m in messages])
        input_tokens = len(self.encoder.encode(input_text))

        pricing = self.PRICING[model]

        input_cost = (input_tokens / 1000) * pricing['input']
        output_cost = (expected_output_tokens / 1000) * pricing['output']

        return input_cost + output_cost

    def choose_model(self, task_complexity: str) -> str:
        """
        Choose cheapest appropriate model.

        Strategy:
        - Use Haiku for simple tasks (classification, extraction)
        - Use Sonnet for medium tasks (analysis, generation)
        - Use Opus for complex tasks (reasoning, creative writing)
        """

        if task_complexity == 'simple':
            return 'claude-haiku-4-5'
        elif task_complexity == 'medium':
            return 'claude-sonnet-4-6'
        else:
            return 'claude-opus-4-8'

    def batch_small_requests(
        self,
        requests: List[str],
        batch_size: int = 10
    ) -> List[str]:
        """
        Batch multiple small requests into one.

        Why: Reduce per-request overhead and connection costs.
        """

        batched = []
        for i in range(0, len(requests), batch_size):
            batch = requests[i:i+batch_size]

            combined_prompt = "Process each item:\n\n"
            for idx, req in enumerate(batch):
                combined_prompt += f"{idx+1}. {req}\n"

            batched.append(combined_prompt)

        return batched

    def use_prompt_caching(
        self,
        system_prompt: str,
        messages: List[Dict]
    ) -> Dict:
        """
        Use Claude's prompt caching to reduce costs for repeated prefixes.

        Caching reduces input token costs by 90% for cached content.
        Use for: RAG context, long system prompts, documentation.
        """

        # Bedrock supports caching for system prompts > 1024 tokens
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "system": [
                {
                    "type": "text",
                    "text": system_prompt,
                    "cache_control": {"type": "ephemeral"}  # Enable caching
                }
            ],
            "messages": messages
        }

        return body

# Usage
optimizer = BedrockCostOptimizer()

# Estimate before running
cost = optimizer.estimate_cost(
    messages=[{"role": "user", "content": "Summarize this article..."}],
    model="claude-sonnet-4-6",
    expected_output_tokens=500
)

if cost > 0.10:
    print(f"Warning: Estimated cost ${cost:.4f}")

# Use cheaper model for simple tasks
model = optimizer.choose_model('simple')  # Returns 'claude-haiku-4-5'

# Batch small requests
tasks = ["Classify: " + item for item in items]
batched = optimizer.batch_small_requests(tasks)

Additional Cost Tips:

  1. Use Provisioned Throughput for high-volume predictable workloads (50%+ savings)
  2. Enable Batch Inference for non-real-time jobs (50% cheaper)
  3. Implement Rate Limiting to avoid unexpected spend
  4. Use CloudWatch Metrics to track token usage by application
  5. Set Budget Alerts in AWS Budgets
Common Mistake: No Cost Controls on Cloud AI Services

What people do: Deploy to production without rate limits, budget alerts, or usage monitoring. A bug, traffic spike, or prompt injection causes runaway API calls. First indication is a $10,000+ bill at month-end.

Why it fails: Cloud AI pricing is per-token, and tokens can accumulate fast. A retry loop hitting an LLM endpoint, a crawler indexing your public API, or a single bad actor can generate millions of tokens in hours. Unlike traditional cloud services with clear capacity limits, LLM APIs happily accept unlimited requests.

Fix: Implement defense in depth: (1) Set hard budget caps per provider (AWS Budgets, Azure Cost Management). (2) Rate limit at API gateway level. (3) Add per-user/per-request token limits in application code. (4) Monitor cost metrics with alerts at 50% and 80% of budget. (5) Log token usage per endpoint to identify cost drivers. Never deploy an LLM feature without at least one cost safeguard.


Azure OpenAI Service

Azure OpenAI provides enterprise-grade access to OpenAI models (GPT-5, GPT-5-mini, DALL-E, Whisper, embeddings) with Microsoft’s security and compliance features.

Deployment and Model Management

Key Concepts:

  • Resource: Azure OpenAI account in a specific region
  • Deployment: Specific model instance with dedicated throughput
  • PTU (Provisioned Throughput Units): Reserved capacity

Setup:

# requirements.txt
openai>=1.12.0
azure-identity>=1.15.0

# azure_openai_client.py
import os
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

class AzureOpenAIClient:
    """Production Azure OpenAI client with managed identity support."""

    def __init__(
        self,
        endpoint: str = None,
        api_key: str = None,
        api_version: str = "2024-02-15-preview",
        use_managed_identity: bool = False
    ):
        """
        Initialize Azure OpenAI client.

        Args:
            endpoint: Azure OpenAI endpoint URL
            api_key: API key (if not using managed identity)
            api_version: API version
            use_managed_identity: Use Azure AD authentication
        """

        self.endpoint = endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
        self.api_version = api_version

        if use_managed_identity:
            # Use managed identity (best practice for production)
            credential = DefaultAzureCredential()
            token_provider = get_bearer_token_provider(
                credential,
                "https://cognitiveservices.azure.com/.default"
            )

            self.client = AzureOpenAI(
                azure_endpoint=self.endpoint,
                azure_ad_token_provider=token_provider,
                api_version=self.api_version
            )
        else:
            # Use API key (simpler for dev)
            self.client = AzureOpenAI(
                azure_endpoint=self.endpoint,
                api_key=api_key or os.getenv("AZURE_OPENAI_API_KEY"),
                api_version=self.api_version
            )

    def chat(
        self,
        messages: list,
        deployment_name: str,
        temperature: float = 0.7,
        max_tokens: int = 800,
        stream: bool = False
    ):
        """
        Chat completion request.

        Args:
            messages: List of message dicts
            deployment_name: Name of your deployment (not model name)
            temperature: Sampling temperature
            max_tokens: Max tokens to generate
            stream: Whether to stream response
        """

        response = self.client.chat.completions.create(
            model=deployment_name,  # Use deployment name, not model name
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream
        )

        if stream:
            return response
        else:
            return {
                'content': response.choices[0].message.content,
                'usage': response.usage.model_dump(),
                'finish_reason': response.choices[0].finish_reason
            }

    def embed(
        self,
        texts: list[str],
        deployment_name: str = "text-embedding-3-small"
    ) -> list[list[float]]:
        """Get embeddings for texts."""

        response = self.client.embeddings.create(
            model=deployment_name,
            input=texts
        )

        return [item.embedding for item in response.data]

# Usage
client = AzureOpenAIClient(
    endpoint="https://my-resource.openai.azure.com/",
    use_managed_identity=True  # Best practice
)

response = client.chat(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain Azure AD vs Azure AD B2C"}
    ],
    deployment_name="gpt-5-deployment"  # Your deployment name
)

print(response['content'])

Azure AI Studio

Azure AI Studio provides a web UI and SDK for managing AI projects, including prompt flow, evaluation, and deployment.

Prompt Flow Integration:

# azure_prompt_flow.py
from promptflow import PFClient
from promptflow.entities import Run
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

class AzurePromptFlow:
    """Use Prompt Flow for complex AI workflows."""

    def __init__(self, subscription_id: str, resource_group: str, workspace: str):
        credential = DefaultAzureCredential()

        self.pf_client = PFClient(
            credential=credential,
            subscription_id=subscription_id,
            resource_group_name=resource_group,
            workspace_name=workspace
        )

        self.ml_client = MLClient(
            credential=credential,
            subscription_id=subscription_id,
            resource_group_name=resource_group,
            workspace_name=workspace
        )

    def create_rag_flow(self, flow_path: str):
        """
        Create a RAG flow with Azure AI Search + GPT-5.

        Flow structure:
        1. Embed user query
        2. Search Azure AI Search
        3. Rerank results
        4. Generate answer with GPT-5
        """

        # flow.dag.yaml defines the flow
        flow = self.pf_client.flows.create_or_update(
            flow=flow_path,
            name="rag-flow"
        )

        return flow

    def run_evaluation(
        self,
        flow_name: str,
        test_data: str,
        metrics: list[str] = ["groundedness", "relevance", "coherence"]
    ):
        """
        Run evaluation on flow outputs.

        Azure AI Studio provides built-in metrics for RAG quality.
        """

        run = self.pf_client.runs.create_or_update(
            flow=flow_name,
            data=test_data,
            column_mapping={
                "question": "${data.question}",
                "context": "${data.context}",
                "answer": "${data.answer}"
            }
        )

        # Evaluate with built-in metrics
        eval_run = self.pf_client.runs.create_or_update(
            flow="evaluation",
            data=test_data,
            run=run.name,
            column_mapping={
                "groundedness": "${run.outputs.answer}",
                "question": "${data.question}",
                "context": "${run.outputs.context}"
            }
        )

        results = self.pf_client.runs.get_metrics(eval_run.name)
        return results

Integration with Azure Services

Azure Functions Integration:

# function_app.py
import azure.functions as func
import logging
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
import os
import json

app = func.FunctionApp()

# Initialize client once (reused across invocations)
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(
    credential,
    "https://cognitiveservices.azure.com/.default"
)

client = AzureOpenAI(
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    azure_ad_token_provider=token_provider,
    api_version="2024-02-15-preview"
)

@app.route(route="chat", methods=["POST"])
def chat_endpoint(req: func.HttpRequest) -> func.HttpResponse:
    """HTTP-triggered Azure Function for chat completions."""

    try:
        req_body = req.get_json()
        message = req_body.get('message')

        response = client.chat.completions.create(
            model=os.getenv("DEPLOYMENT_NAME"),
            messages=[
                {"role": "user", "content": message}
            ],
            max_tokens=1000
        )

        return func.HttpResponse(
            json.dumps({
                "response": response.choices[0].message.content,
                "usage": response.usage.model_dump()
            }),
            mimetype="application/json"
        )

    except Exception as e:
        logging.error(f"Error: {e}")
        return func.HttpResponse(
            json.dumps({"error": str(e)}),
            status_code=500
        )

@app.cosmos_db_trigger(
    arg_name="documents",
    database_name="chatdb",
    collection_name="messages",
    connection_string_setting="CosmosDBConnection",
    lease_collection_name="leases",
    create_lease_collection_if_not_exists=True
)
def process_message(documents: func.DocumentList):
    """
    Cosmos DB trigger to process new chat messages.

    Use case: Moderate messages, extract entities, generate summaries.
    """

    for doc in documents:
        message = doc['message']

        # Moderate with Azure OpenAI
        moderation = client.moderations.create(input=message)

        if moderation.results[0].flagged:
            logging.warning(f"Flagged message: {message}")
            # Take action (delete, notify, etc.)

Azure Cognitive Search Integration (RAG):

# azure_search_rag.py
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex,
    SimpleField,
    SearchableField,
    SearchFieldDataType,
    VectorSearch,
    HnswAlgorithmConfiguration,
    VectorSearchProfile,
    SemanticConfiguration,
    SemanticField,
    SemanticPrioritizedFields,
    SemanticSearch
)
from azure.core.credentials import AzureKeyCredential
from openai import AzureOpenAI
import os

class AzureSearchRAG:
    """RAG implementation using Azure Cognitive Search + Azure OpenAI."""

    def __init__(
        self,
        search_endpoint: str,
        search_key: str,
        openai_client: AzureOpenAI,
        index_name: str = "documents"
    ):
        self.search_client = SearchClient(
            endpoint=search_endpoint,
            index_name=index_name,
            credential=AzureKeyCredential(search_key)
        )

        self.index_client = SearchIndexClient(
            endpoint=search_endpoint,
            credential=AzureKeyCredential(search_key)
        )

        self.openai = openai_client
        self.index_name = index_name

    def create_index(self):
        """Create search index with vector search support."""

        fields = [
            SimpleField(
                name="id",
                type=SearchFieldDataType.String,
                key=True
            ),
            SearchableField(
                name="content",
                type=SearchFieldDataType.String,
                searchable=True
            ),
            SearchableField(
                name="title",
                type=SearchFieldDataType.String,
                searchable=True
            ),
            SimpleField(
                name="embedding",
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                searchable=True,
                vector_search_dimensions=1536,  # Ada-002 dimension
                vector_search_profile_name="vector-profile"
            )
        ]

        vector_search = VectorSearch(
            algorithms=[
                HnswAlgorithmConfiguration(name="hnsw-config")
            ],
            profiles=[
                VectorSearchProfile(
                    name="vector-profile",
                    algorithm_configuration_name="hnsw-config"
                )
            ]
        )

        semantic_config = SemanticConfiguration(
            name="semantic-config",
            prioritized_fields=SemanticPrioritizedFields(
                title_field=SemanticField(field_name="title"),
                content_fields=[SemanticField(field_name="content")]
            )
        )

        semantic_search = SemanticSearch(
            configurations=[semantic_config]
        )

        index = SearchIndex(
            name=self.index_name,
            fields=fields,
            vector_search=vector_search,
            semantic_search=semantic_search
        )

        self.index_client.create_or_update_index(index)

    def index_documents(self, documents: list[dict]):
        """
        Index documents with embeddings.

        Each document: {"id": str, "title": str, "content": str}
        """

        # Generate embeddings
        contents = [doc['content'] for doc in documents]

        embeddings_response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=contents
        )

        # Add embeddings to documents
        for doc, emb_data in zip(documents, embeddings_response.data):
            doc['embedding'] = emb_data.embedding

        # Upload to search
        self.search_client.upload_documents(documents)

    def search(self, query: str, top_k: int = 5) -> list[dict]:
        """Hybrid search: vector + keyword + semantic ranking."""

        # Get query embedding
        query_embedding = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=[query]
        ).data[0].embedding

        # Hybrid search
        results = self.search_client.search(
            search_text=query,
            vector_queries=[{
                "vector": query_embedding,
                "k_nearest_neighbors": top_k,
                "fields": "embedding"
            }],
            query_type="semantic",
            semantic_configuration_name="semantic-config",
            top=top_k
        )

        return [
            {
                "content": r["content"],
                "title": r["title"],
                "score": r["@search.score"]
            }
            for r in results
        ]

    def retrieve_and_generate(
        self,
        query: str,
        deployment_name: str = "gpt-5"
    ) -> dict:
        """RAG: Retrieve context and generate answer."""

        # Retrieve
        search_results = self.search(query, top_k=3)

        # Build context
        context = "\n\n".join([
            f"Document: {r['title']}\n{r['content']}"
            for r in search_results
        ])

        # Generate
        response = self.openai.chat.completions.create(
            model=deployment_name,
            messages=[
                {
                    "role": "system",
                    "content": "Answer questions based on the provided context. "
                               "If the answer isn't in the context, say so."
                },
                {
                    "role": "user",
                    "content": f"Context:\n{context}\n\nQuestion: {query}"
                }
            ],
            temperature=0.3
        )

        return {
            "answer": response.choices[0].message.content,
            "sources": [r['title'] for r in search_results],
            "usage": response.usage.model_dump()
        }

# Usage
openai_client = AzureOpenAI(
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    api_version="2024-02-15-preview"
)

rag = AzureSearchRAG(
    search_endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"),
    search_key=os.getenv("AZURE_SEARCH_KEY"),
    openai_client=openai_client
)

# Index documents
rag.index_documents([
    {
        "id": "1",
        "title": "Azure OpenAI Service",
        "content": "Azure OpenAI Service provides..."
    }
])

# RAG query
result = rag.retrieve_and_generate("What is Azure OpenAI?")
print(result['answer'])
print(f"Sources: {result['sources']}")

Enterprise Features

Private Endpoints:

# Use private endpoint to keep traffic within Azure VNet
# Configure in Azure Portal or via ARM template:

# arm_template.json (snippet)
{
  "type": "Microsoft.CognitiveServices/accounts",
  "apiVersion": "2023-05-01",
  "name": "[parameters('accountName')]",
  "properties": {
    "publicNetworkAccess": "Disabled",
    "networkAcls": {
      "defaultAction": "Deny",
      "virtualNetworkRules": [
        {
          "id": "[parameters('subnetId')]"
        }
      ]
    }
  }
}

Managed Identity Configuration:

# managed_identity_setup.py
from azure.identity import ManagedIdentityCredential
from azure.keyvault.secrets import SecretClient
from openai import AzureOpenAI

class SecureAzureOpenAI:
    """
    Production setup with managed identity and Key Vault.

    Security best practices:
    1. No API keys in code/env vars
    2. Managed identity for authentication
    3. Secrets in Key Vault
    4. Network isolation via private endpoints
    """

    def __init__(self, key_vault_url: str):
        # Managed identity credential (assigned to Azure resource)
        credential = ManagedIdentityCredential()

        # Get secrets from Key Vault
        secret_client = SecretClient(
            vault_url=key_vault_url,
            credential=credential
        )

        endpoint = secret_client.get_secret("openai-endpoint").value

        # Use managed identity for OpenAI
        from azure.identity import get_bearer_token_provider
        token_provider = get_bearer_token_provider(
            credential,
            "https://cognitiveservices.azure.com/.default"
        )

        self.client = AzureOpenAI(
            azure_endpoint=endpoint,
            azure_ad_token_provider=token_provider,
            api_version="2024-02-15-preview"
        )

    def chat(self, messages: list, deployment: str):
        return self.client.chat.completions.create(
            model=deployment,
            messages=messages
        )

# Usage in Azure App Service or Azure Container Instances
secure_client = SecureAzureOpenAI(
    key_vault_url="https://my-keyvault.vault.azure.net/"
)

Data Residency and Compliance:

# Azure OpenAI supports regional deployment for compliance

# GDPR (Europe): Deploy to West Europe or North Europe
client_eu = AzureOpenAI(
    azure_endpoint="https://my-resource-westeurope.openai.azure.com/",
    api_key=os.getenv("AZURE_OPENAI_API_KEY_EU"),
    api_version="2024-02-15-preview"
)

# HIPAA: Enable in Azure Portal, use BAA-signed regions
# Supported regions: East US, South Central US, West US 2

# Customer Lockbox: Enable for human approval before Microsoft access
# Configure in Azure Portal -> OpenAI Resource -> Customer Lockbox

Azure OpenAI Pricing and Cost Optimization

Pricing (as of 2026):

Model Input (per 1K tokens) Output (per 1K tokens) PTU (per hour)
GPT-5 $0.00125 $0.01 $9.03
GPT-5.5 $0.005 $0.03 -
GPT-5.4 $0.0025 $0.015 -
GPT-5-mini $0.0005 $0.0015 $0.53
Ada-002 (embeddings) $0.0001 - -

PTU (Provisioned Throughput) vs Pay-as-you-go:

# cost_comparison.py

import math

def calculate_breakeven(tokens_per_day: int, model: str = "gpt-5"):
    """
    Compare Azure PTU (reserved capacity) vs PAYG (per-token).

    Key subtlety: PTUs are provisioned in *discrete, always-on* blocks with a
    minimum commitment — you cannot buy a fraction of a PTU. At low volume you
    therefore pay for idle reserved capacity, which is why PAYG wins until you
    can keep a whole PTU busy.
    """
    # PAYG (GPT-5: $1.25/M input, $10/M output), assume 50/50 input/output
    payg_input_cost = 0.00125   # per 1K tokens
    payg_output_cost = 0.01     # per 1K tokens
    payg_daily_cost = (
        (tokens_per_day / 1000 / 2) * payg_input_cost +
        (tokens_per_day / 1000 / 2) * payg_output_cost
    )
    payg_monthly_cost = payg_daily_cost * 30

    # PTU: ~$9.03/hr per unit, always-on; each PTU ≈ 5,000 tok/s effective.
    ptu_monthly_per_unit = 9.03 * 24 * 30                  # one PTU, full month
    ptu_capacity_per_month = 5000 * 60 * 60 * 24 * 30      # tokens one PTU serves

    # You must provision *whole* PTUs, always on (min 1 here for illustration;
    # real minimum commitments are larger).
    ptus_needed = max(1, math.ceil(tokens_per_day * 30 / ptu_capacity_per_month))
    ptu_monthly_cost = ptus_needed * ptu_monthly_per_unit

    print(f"PAYG monthly cost: ${payg_monthly_cost:,.2f}")
    print(f"PTU monthly cost:  ${ptu_monthly_cost:,.2f} ({ptus_needed} PTU, always-on)")
    print(f"PTU cheaper? {ptu_monthly_cost < payg_monthly_cost}")

    return ptu_monthly_cost < payg_monthly_cost

# Check if PTU is worth it
calculate_breakeven(tokens_per_day=1_000_000)  # 1M tokens/day

# Output (illustrative — actual prices change):
# PAYG monthly cost: $168.75
# PTU monthly cost:  $6,501.60 (1 PTU, always-on)
# PTU cheaper? False
#
# At 1M tokens/day you can't keep even one PTU busy, so its always-on cost
# dwarfs PAYG. PTU only wins once sustained throughput approaches a full PTU's
# capacity (~13B tokens/month here, i.e. hundreds of M tokens/day). Validate
# against Azure's PTU calculator and current minimum commitments.

Cost Optimization Tips:

  1. Use GPT-5-mini for simple tasks (dramatically cheaper than GPT-5.5)
  2. Implement caching for repeated queries
  3. Set max_tokens appropriately (don’t use default)
  4. Use batch processing instead of real-time where possible
  5. Monitor with Azure Monitor and set budget alerts

Google Cloud Vertex AI

Vertex AI is Google’s unified ML platform, providing access to Google’s models (PaLM, Gemini, Codey) and open-source models.

Model Garden and Deployment

Available Models:

  • Gemini Pro: Google’s flagship multimodal model
  • Gemini 3.1 Pro: Image understanding
  • PaLM 2: Text generation
  • Codey: Code generation
  • Chirp: Speech-to-text
  • Imagen: Image generation
  • Open models: Llama 4, Falcon, Mistral via Model Garden

Setup:

# requirements.txt
google-cloud-aiplatform>=1.42.0
vertexai>=0.0.1

# vertex_ai_client.py
import vertexai
from vertexai.language_models import TextGenerationModel, ChatModel
from vertexai.generative_models import GenerativeModel
from google.cloud import aiplatform
import os

class VertexAIClient:
    """Production Vertex AI client."""

    def __init__(
        self,
        project_id: str,
        location: str = "us-central1"
    ):
        """
        Initialize Vertex AI.

        Args:
            project_id: GCP project ID
            location: Region (us-central1, europe-west4, asia-southeast1)
        """

        self.project_id = project_id
        self.location = location

        # Initialize Vertex AI SDK
        vertexai.init(project=project_id, location=location)

    def generate_text_palm(
        self,
        prompt: str,
        model_name: str = "text-bison@002",
        temperature: float = 0.7,
        max_output_tokens: int = 1024
    ) -> dict:
        """
        Generate text with PaLM 2.

        Args:
            prompt: Input prompt
            model_name: Model version
            temperature: Sampling temperature
            max_output_tokens: Max tokens to generate

        Returns:
            {"text": str, "safety": dict}
        """

        model = TextGenerationModel.from_pretrained(model_name)

        response = model.predict(
            prompt,
            temperature=temperature,
            max_output_tokens=max_output_tokens,
            top_k=40,
            top_p=0.95
        )

        return {
            "text": response.text,
            "safety": response.safety_attributes
        }

    def chat_palm(
        self,
        messages: list[str],
        context: str = "",
        model_name: str = "chat-bison@002"
    ) -> dict:
        """
        Chat with PaLM 2.

        Args:
            messages: List of user messages (alternating)
            context: Optional context for the conversation

        Returns:
            Response dict
        """

        chat_model = ChatModel.from_pretrained(model_name)

        chat = chat_model.start_chat(context=context)

        response = chat.send_message(messages[-1])

        return {
            "text": response.text,
            "history": [
                {"author": msg.author, "content": msg.content}
                for msg in chat.message_history
            ]
        }

    def generate_gemini(
        self,
        prompt: str,
        model_name: str = "gemini-3.1-pro",
        temperature: float = 0.9
    ) -> dict:
        """
        Generate with Gemini Pro.

        Gemini supports multimodal inputs (text, images, video).
        """

        model = GenerativeModel(model_name)

        response = model.generate_content(
            prompt,
            generation_config={
                "temperature": temperature,
                "top_p": 0.95,
                "top_k": 40,
                "max_output_tokens": 8192,
            },
            safety_settings={
                "HARM_CATEGORY_HARASSMENT": "BLOCK_MEDIUM_AND_ABOVE",
                "HARM_CATEGORY_HATE_SPEECH": "BLOCK_MEDIUM_AND_ABOVE",
                "HARM_CATEGORY_SEXUALLY_EXPLICIT": "BLOCK_MEDIUM_AND_ABOVE",
                "HARM_CATEGORY_DANGEROUS_CONTENT": "BLOCK_MEDIUM_AND_ABOVE",
            }
        )

        return {
            "text": response.text,
            "safety_ratings": response.candidates[0].safety_ratings
        }

    def generate_gemini_multimodal(
        self,
        text_prompt: str,
        image_path: str = None,
        video_path: str = None
    ) -> str:
        """
        Multimodal generation with Gemini 3.1 Pro.

        Can analyze images and videos.
        """

        model = GenerativeModel("gemini-3.1-pro-vision")

        contents = [text_prompt]

        if image_path:
            from vertexai.generative_models import Image
            image = Image.load_from_file(image_path)
            contents.append(image)

        if video_path:
            from vertexai.generative_models import Part
            video = Part.from_uri(
                video_path,  # GCS URI: gs://bucket/video.mp4
                mime_type="video/mp4"
            )
            contents.append(video)

        response = model.generate_content(contents)

        return response.text

# Usage
client = VertexAIClient(
    project_id="my-gcp-project",
    location="us-central1"
)

# Text generation with PaLM
response = client.generate_text_palm(
    prompt="Explain microservices architecture in 3 sentences",
    temperature=0.3
)
print(response['text'])

# Gemini with vision
analysis = client.generate_gemini_multimodal(
    text_prompt="Describe what's in this image in detail",
    image_path="/path/to/image.jpg"
)
print(analysis)

Vertex AI Agents

Creating an Agent with Tools:

# vertex_agent.py
from vertexai.preview import reasoning_engines
from google.cloud import aiplatform
import requests

class WeatherTool:
    """Tool for getting weather information."""

    def get_weather(self, location: str) -> dict:
        """Get current weather for a location."""
        # In production, use real weather API
        api_key = "YOUR_WEATHER_API_KEY"
        url = f"https://api.openweathermap.org/data/2.5/weather?q={location}&appid={api_key}"

        response = requests.get(url)
        data = response.json()

        return {
            "temperature": data["main"]["temp"],
            "description": data["weather"][0]["description"],
            "humidity": data["main"]["humidity"]
        }

class DatabaseTool:
    """Tool for querying BigQuery."""

    def __init__(self, project_id: str):
        from google.cloud import bigquery
        self.client = bigquery.Client(project=project_id)

    def query_sales(self, product: str) -> dict:
        """Query sales data from BigQuery."""
        query = f"""
        SELECT SUM(revenue) as total_revenue, COUNT(*) as num_sales
        FROM `sales.transactions`
        WHERE product = '{product}'
        AND date >= CURRENT_DATE() - 30
        """

        results = self.client.query(query).result()
        row = next(results)

        return {
            "total_revenue": row.total_revenue,
            "num_sales": row.num_sales
        }

class VertexAgent:
    """Vertex AI agent with tools."""

    def __init__(self, project_id: str, location: str = "us-central1"):
        self.project_id = project_id
        self.location = location

        aiplatform.init(project=project_id, location=location)

        # Define tools
        self.weather_tool = WeatherTool()
        self.db_tool = DatabaseTool(project_id)

    def create_agent(self):
        """Create reasoning engine with tools."""

        # Define the agent
        agent = reasoning_engines.LangchainAgent(
            model="gemini-3.1-pro",
            tools=[
                {
                    "name": "get_weather",
                    "description": "Get current weather for a location",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "location": {
                                "type": "string",
                                "description": "City name"
                            }
                        },
                        "required": ["location"]
                    },
                    "function": self.weather_tool.get_weather
                },
                {
                    "name": "query_sales",
                    "description": "Get sales data for a product from last 30 days",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "product": {
                                "type": "string",
                                "description": "Product name"
                            }
                        },
                        "required": ["product"]
                    },
                    "function": self.db_tool.query_sales
                }
            ],
            agent_executor_kwargs={
                "verbose": True,
                "max_iterations": 5
            }
        )

        return agent

    def deploy_agent(self, agent):
        """Deploy agent as a managed service."""

        # Create reasoning engine
        remote_agent = reasoning_engines.ReasoningEngine.create(
            agent,
            display_name="sales-weather-agent",
            description="Agent that can check weather and query sales data"
        )

        return remote_agent

    def query_agent(self, agent, query: str):
        """Query the deployed agent."""

        response = agent.query(input=query)
        return response

# Usage
agent_manager = VertexAgent(project_id="my-project")

# Create and deploy
agent = agent_manager.create_agent()
deployed = agent_manager.deploy_agent(agent)

# Query
response = agent_manager.query_agent(
    deployed,
    "What's the weather in Seattle and how much revenue did we make on laptops?"
)

print(response)

Integration with GCP Services

Cloud Functions Integration:

# main.py (Cloud Function)
import functions_framework
import vertexai
from vertexai.generative_models import GenerativeModel
import os

# Initialize once (cold start)
vertexai.init(project=os.getenv("GCP_PROJECT"), location="us-central1")
model = GenerativeModel("gemini-3.1-pro")

@functions_framework.http
def generate_text(request):
    """
    HTTP Cloud Function for text generation.

    Triggered by HTTP request.
    """

    request_json = request.get_json()
    prompt = request_json.get('prompt', '')

    if not prompt:
        return {'error': 'No prompt provided'}, 400

    response = model.generate_content(prompt)

    return {
        'generated_text': response.text,
        'safety_ratings': [
            {
                'category': rating.category.name,
                'probability': rating.probability.name
            }
            for rating in response.candidates[0].safety_ratings
        ]
    }

@functions_framework.cloud_event
def process_gcs_file(cloud_event):
    """
    Process file uploaded to Cloud Storage.

    Triggered by Cloud Storage event.
    """

    from google.cloud import storage

    data = cloud_event.data
    bucket_name = data["bucket"]
    file_name = data["name"]

    # Download file
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    content = blob.download_as_text()

    # Analyze with Gemini
    model = GenerativeModel("gemini-3.1-pro")

    analysis = model.generate_content(
        f"Summarize this document in 3 bullet points:\n\n{content}"
    )

    # Save summary back to GCS
    summary_blob = bucket.blob(f"{file_name}.summary.txt")
    summary_blob.upload_from_string(analysis.text)

    print(f"Processed {file_name}")

BigQuery Integration:

# vertex_bigquery.py
from google.cloud import bigquery
from google.cloud import aiplatform
import vertexai
from vertexai.generative_models import GenerativeModel

class VertexBigQueryAnalysis:
    """Analyze BigQuery data with Vertex AI."""

    def __init__(self, project_id: str):
        self.project_id = project_id
        self.bq_client = bigquery.Client(project=project_id)

        vertexai.init(project=project_id, location="us-central1")
        self.model = GenerativeModel("gemini-3.1-pro")

    def query_and_analyze(self, sql_query: str) -> dict:
        """
        Run BigQuery query and analyze results with Gemini.

        Use case: Generate insights from data.
        """

        # Run query
        query_job = self.bq_client.query(sql_query)
        results = query_job.result()

        # Convert to text
        data_text = "Query results:\n"
        for row in results:
            data_text += str(dict(row)) + "\n"

        # Analyze with Gemini
        prompt = f"""Analyze this data and provide:
1. Key insights
2. Trends or patterns
3. Recommendations

Data:
{data_text}
"""

        analysis = self.model.generate_content(prompt)

        return {
            "raw_data": [dict(row) for row in results],
            "analysis": analysis.text
        }

    def natural_language_to_sql(self, question: str, schema: str) -> str:
        """
        Convert natural language question to SQL.

        Use case: Let users query data without knowing SQL.
        """

        prompt = f"""Given this BigQuery schema:
{schema}

Convert this question to a SQL query:
"{question}"

Return ONLY the SQL query, no explanation.
"""

        response = self.model.generate_content(prompt)
        sql = response.text.strip()

        # Clean up (remove markdown code blocks if present)
        sql = sql.replace("```sql", "").replace("```", "").strip()

        return sql

    def create_ml_model(
        self,
        training_table: str,
        model_name: str,
        target_column: str
    ):
        """
        Create BigQuery ML model.

        Vertex AI can also train custom models, but BQML is simpler
        for tabular data.
        """

        query = f"""
        CREATE OR REPLACE MODEL `{self.project_id}.{model_name}`
        OPTIONS(
            model_type='LOGISTIC_REG',
            input_label_cols=['{target_column}']
        ) AS
        SELECT * FROM `{training_table}`
        """

        self.bq_client.query(query).result()

        print(f"Model {model_name} created")

# Usage
analyzer = VertexBigQueryAnalysis(project_id="my-project")

# Natural language to SQL
sql = analyzer.natural_language_to_sql(
    question="What are the top 5 products by revenue this month?",
    schema="""
    Table: sales.transactions
    Columns:
    - product (STRING)
    - revenue (FLOAT64)
    - date (DATE)
    """
)

print(f"Generated SQL: {sql}")

# Run and analyze
results = analyzer.query_and_analyze(sql)
print(results['analysis'])

Firestore Integration:

# vertex_firestore.py
from google.cloud import firestore
import vertexai
from vertexai.generative_models import GenerativeModel

class VertexFirestoreChat:
    """Chat application with Firestore for history."""

    def __init__(self, project_id: str):
        self.db = firestore.Client(project=project_id)

        vertexai.init(project=project_id, location="us-central1")
        self.model = GenerativeModel("gemini-3.1-pro")

    def send_message(
        self,
        user_id: str,
        conversation_id: str,
        message: str
    ) -> str:
        """
        Send message in a conversation.

        History stored in Firestore for context.
        """

        # Get conversation history
        conv_ref = self.db.collection('conversations').document(conversation_id)
        conv = conv_ref.get()

        history = []
        if conv.exists:
            history = conv.to_dict().get('messages', [])

        # Build prompt with history
        prompt_parts = []
        for msg in history[-10:]:  # Last 10 messages for context
            role = "User" if msg['role'] == 'user' else "Assistant"
            prompt_parts.append(f"{role}: {msg['content']}")

        prompt_parts.append(f"User: {message}")
        prompt = "\n".join(prompt_parts)

        # Generate response
        response = self.model.generate_content(prompt)
        assistant_message = response.text

        # Save to Firestore
        history.append({
            'role': 'user',
            'content': message,
            'timestamp': firestore.SERVER_TIMESTAMP
        })
        history.append({
            'role': 'assistant',
            'content': assistant_message,
            'timestamp': firestore.SERVER_TIMESTAMP
        })

        conv_ref.set({
            'user_id': user_id,
            'messages': history,
            'updated_at': firestore.SERVER_TIMESTAMP
        })

        return assistant_message

# Usage
chat = VertexFirestoreChat(project_id="my-project")

response = chat.send_message(
    user_id="user123",
    conversation_id="conv456",
    message="What did we discuss earlier?"
)

print(response)

Vertex AI Pricing and Cost Optimization

Pricing (as of 2026):

Model Input (per 1M tokens) Output (per 1M tokens)
Gemini 3.1 Pro $2.00 (≤200K ctx) / $4.00 (>200K) $12.00 (≤200K) / $18.00 (>200K)
Gemini 3.5 Flash $1.50 $9.00
PaLM 2 (legacy) $0.25 $0.50
Codey (legacy) $0.40 $0.80

Note: As of 2026, Vertex AI Gemini 3.x models are billed per token (consistent with the Gemini Developer API). Older PaLM and Codey models still bill per 1K characters (~4 chars per token).

Cost Optimization:

# vertex_cost_optimizer.py

class VertexCostOptimizer:
    """Optimize Vertex AI costs."""

    # Per-1K-token pricing (Gemini 3.x bills per token as of 2026)
    PRICING = {
        'gemini-3.1-pro': {'input': 0.002, 'output': 0.012},
        'gemini-3.5-flash': {'input': 0.0015, 'output': 0.009},
        'text-bison': {'input': 0.00025, 'output': 0.0005},  # PaLM 2 legacy, per 1K chars
    }

    def estimate_cost(
        self,
        prompt: str,
        expected_output_tokens: int,
        model: str = 'gemini-3.1-pro'
    ) -> float:
        """Estimate cost before request (tokens for Gemini 3.x)."""

        # Rough char-to-token conversion for the prompt
        input_tokens = len(prompt) / 4
        pricing = self.PRICING[model]

        input_cost = (input_tokens / 1000) * pricing['input']
        output_cost = (expected_output_tokens / 1000) * pricing['output']

        return input_cost + output_cost

    def batch_predictions(
        self,
        prompts: list[str],
        output_gcs_path: str
    ):
        """
        Use batch prediction API for cost savings.

        Batch is 50% cheaper but takes longer (async processing).
        """

        from google.cloud import aiplatform

        # Create batch prediction job
        job = aiplatform.BatchPredictionJob.create(
            job_display_name="batch-generation",
            model_name="gemini-pro",
            instances_format="jsonl",
            predictions_format="jsonl",
            gcs_source="gs://bucket/prompts.jsonl",
            gcs_destination_prefix=output_gcs_path
        )

        job.wait()

        return job

# Use model tuning for cost reduction
# Fine-tuned smaller model can be cheaper than large base model
def tune_model_for_task(training_data_path: str):
    """
    Tune PaLM for specific task.

    Benefits:
    - Better quality at lower cost
    - Smaller prompts needed
    - Faster inference
    """

    from vertexai.language_models import TextGenerationModel

    model = TextGenerationModel.from_pretrained("text-bison@002")

    tuning_job = model.tune_model(
        training_data=training_data_path,
        train_steps=100,
        tuning_job_location="us-central1",
        tuned_model_location="us-central1"
    )

    return tuning_job

Provider Comparison

Feature AWS Bedrock Azure OpenAI Vertex AI
Best Models Claude Sonnet 4.6/Opus GPT-5, GPT-5-mini Gemini 3.1 Pro
Unique Strength Knowledge Bases, Agents Enterprise integration Multimodal, Model Garden
Pricing Model Pay-per-token Pay-per-token + PTU Pay-per-character
Fine-tuning Llama 4 (SageMaker) GPT-5 PaLM, open models
RAG Support Bedrock KB + OpenSearch AI Search Vertex AI Search
Agents Bedrock Agents Azure AI Studio Vertex AI Agents
Compliance SOC2, HIPAA, GDPR SOC2, HIPAA, FedRAMP SOC2, HIPAA, GDPR
Best For Claude users, AWS ecosystem Enterprise, Microsoft stack Google ecosystem, multimodal

Summary

Each cloud provider has distinct strengths:

  1. AWS Bedrock: Best access to Claude models, strong integration with AWS services (Lambda, S3, SageMaker). Choose when you’re in AWS ecosystem or need Claude’s capabilities.

  2. Azure OpenAI: Enterprise-ready GPT access with Microsoft security features. Choose when you need Microsoft compliance (FedRAMP, Government) or integration with Microsoft 365/Azure services.

  3. Google Vertex AI: Leading multimodal capabilities with Gemini, best for applications involving images, video, and audio. Choose when you need multimodal or are in GCP ecosystem.

Key Decision Points: - Model quality: Match the model to your use case (Claude for analysis, GPT-5 for general tasks, Gemini for multimodal) - Ecosystem: Choose based on your existing cloud investment - Compliance: Check regional availability and certification requirements - Cost: Compare pricing at your expected volume; PTU vs pay-as-you-go breakeven matters


Connections to Other Chapters

This chapter’s cloud provider details connect to other topics:

  • Chapter 9 (LLM Deployment & Infrastructure): Self-hosted alternatives to managed cloud services.

  • Chapter 13 (Multi-Cloud & Cloud-Native Patterns): Advanced deployment patterns including multi-cloud, serverless, and security.

  • Chapter 32 (Cost Engineering): Detailed cost optimization strategies building on provider pricing here.

  • Chapter 31 (Reliability Engineering): SLOs and graceful degradation for cloud AI systems.

  • LLM Provider Comparison (Appendix K): Model capability comparisons across providers.


Practical Exercises

  1. Provider Cost Comparison: Calculate the cost of running 1M tokens/day on each provider: AWS Bedrock with Claude Sonnet 4.6, Azure OpenAI with GPT-5, Vertex AI with Gemini Pro. Create a spreadsheet comparing monthly costs at various volume tiers (100K, 1M, 10M, 100M tokens/day) and identify the break-even points between pay-as-you-go and provisioned throughput.

  2. Provider RAG Implementation: Implement RAG using your primary cloud provider: AWS (Bedrock Knowledge Bases + Claude), Azure (Cognitive Search + Azure OpenAI), or GCP (Vertex AI Search + Gemini). Compare setup complexity, query latency, and answer quality. Deliverable: Working implementation with performance benchmarks.

  3. Multi-Model Routing: Build a request router that sends different query types to different models: simple queries to a cheaper model (Claude Haiku, GPT-4o-mini), complex reasoning to a capable model (Claude Sonnet, GPT-4o). Implement classification logic, measure cost savings, and ensure latency overhead is minimal.

  4. Disaster Recovery Testing: Set up a secondary provider as a fallback for your primary. Implement health checks, automatic failover, and manual failback. Test the failover process: How long does it take? What data is lost? Document the operational runbook.

  5. Compliance Audit Preparation: For a hypothetical SOC 2 audit, document your cloud AI setup: data flow diagrams, access controls, encryption at rest/transit, logging configuration, incident response procedures. Identify any gaps between your current setup and compliance requirements.


Self-Assessment Checkpoint

Conceptual Questions

Q1. [IC2] What’s the difference between pay-as-you-go and provisioned throughput pricing? When would you choose each?

Answer Pay-as-you-go: Billed per token (input + output). No commitment, scales to zero. Good for: variable workloads, low volume (<1M tokens/day), development/testing. Provisioned throughput: Reserved capacity at a fixed hourly/monthly rate. Guarantees availability and often lower per-token cost at high volume. Good for: steady high-volume production (>10M tokens/day), latency-sensitive applications needing guaranteed capacity. The break-even depends on provider and model, but typically at 5-10M tokens/day, provisioned becomes cheaper. Key consideration: Provisioned capacity is “use it or lose it”—if your traffic varies significantly, you may over-provision and waste money.

Q2. [IC2] You’re building on AWS and want to use GPT-4. What are your options, and what are the tradeoffs of each?

Answer Options: (1) Call OpenAI API directly from AWS: Simple, full model access, but data leaves AWS (compliance concern), adds network latency, requires managing OpenAI credentials in AWS. (2) Set up Azure OpenAI and call from AWS: Microsoft-managed, enterprise features, but complex architecture (AWS → Azure → back), similar compliance concerns. (3) Use a proxy service: Some services wrap OpenAI with compliance features, but adds cost and latency. (4) Consider AWS-native alternatives: Claude on Bedrock is comparable to GPT-4 for most tasks and stays within AWS ecosystem. Tradeoff decision: Most teams should use Bedrock with Claude unless they have a specific GPT-4 requirement. The operational simplicity of staying in one cloud usually outweighs model differences.

Q3. [Senior] Explain the compliance implications of using different cloud AI providers. What certifications matter and why?

Answer

Key certifications: (1) SOC 2 Type II: Audited security controls. Required by most enterprise customers. All major providers have this. (2) HIPAA BAA: Required for healthcare data. Azure and AWS offer BAAs for their AI services; GCP has limited HIPAA support for Vertex AI. (3) FedRAMP: US government cloud authorization. Azure has the broadest FedRAMP coverage for AI services. (4) GDPR: EU data protection. Requires data residency options (available in all providers) and data processing agreements. (5) PCI DSS: Payment card data. All providers support, but your application architecture matters more.

Implications: (1) Data residency: Where is inference data processed? Does the provider guarantee regional processing? (2) Data retention: How long does the provider store prompts/responses? Can you opt out? (3) Subprocessors: Who else processes your data? (4) Audit logs: Can you prove to auditors who accessed what? Choose provider based on your compliance requirements first, then evaluate model quality.

Q4. [Senior] A company has 50 applications using direct OpenAI API calls. They want to migrate to Azure OpenAI for compliance. Design the migration strategy.

Answer

Migration strategy: (1) Audit: Inventory all OpenAI usage—which apps, which models, what volumes, what features (function calling, embeddings, fine-tuned models). (2) Compatibility check: Azure OpenAI has API differences (deployments vs models, content filtering). Test each app against Azure OpenAI in staging. (3) Abstraction layer: Create a common client wrapper that both OpenAI and Azure OpenAI can implement. Migrate apps to use the abstraction first (low risk), then swap implementations. (4) Phased rollout: Start with low-risk, low-volume apps. Validate cost, latency, quality parity. Expand to higher-risk apps. (5) Fine-tuned models: If using OpenAI fine-tuned models, you’ll need to retrain on Azure (different fine-tuning API) or migrate to base models with better prompts. (6) Monitoring: Compare quality metrics, costs, latency between providers during migration. (7) Cutover: Once all apps are migrated and validated, deprecate OpenAI credentials.

Timeline: 3-6 months for 50 apps, depending on complexity and fine-tuning requirements.

Q5. [Staff] Your organization uses all three major cloud providers. Design a unified AI platform that allows teams to use any provider while maintaining consistent observability, security, and cost allocation.

Answer

Platform architecture: (1) Unified API gateway: Single endpoint that routes to Bedrock, Azure OpenAI, or Vertex AI. Handles auth, rate limiting, logging. Teams don’t need cloud-specific credentials. (2) Common data model: Define standard request/response format. Gateway translates to/from provider-specific formats. (3) Observability: All requests emit spans to central tracing (OpenTelemetry). Cost, latency, error rates tracked by team, app, and provider. (4) Security: Gateway handles encryption, PII filtering, audit logging. Teams don’t directly access cloud credentials. (5) Cost allocation: Gateway tags requests with team/project metadata. Monthly cost reports by team, provider, and model. (6) Provider abstraction: Teams specify capability requirements (model size, context length, multimodal), gateway routes to appropriate provider/model. (7) Failover: Gateway can route around provider outages.

Organizational model: Platform team owns the gateway infrastructure. Teams consume via SDK/API. Platform team negotiates enterprise agreements with all providers. Central budget for AI spend with chargeback to teams.

Implementation priority: Start with routing and logging. Add security features. Then cost allocation. Failover is last (most complex).

Spot the Problem

Problem 1. [IC2] A developer’s Bedrock code isn’t working:

import boto3

client = boto3.client("bedrock-runtime", region_name="us-east-1")

response = client.invoke_model(
    modelId="anthropic.claude-sonnet",
    body='{"prompt": "Hello, Claude!"}'
)

What’s wrong?

Answer

Multiple issues: (1) Model ID is incomplete—should be anthropic.claude-3-5-sonnet-20241022-v2:0 or similar versioned ID. (2) Body format is wrong—Claude on Bedrock uses the Messages API format, not a simple prompt string. Correct format:

body = json.dumps({
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 1024,
    "messages": [{"role": "user", "content": "Hello, Claude!"}]
})
  1. No content type specified—should include contentType="application/json". (4) Region might not have Claude enabled—needs model access requested in Bedrock console.
Best practice: Use the Anthropic SDK with Bedrock integration instead of raw boto3 for cleaner code and automatic format handling.

Problem 2. [Senior] A production Azure OpenAI application has intermittent 429 errors:

# Config
deployment_name = "gpt-4-production"
# TPM limit: 240,000

# Peak load: 100 requests/minute, avg 1000 tokens each
# = 100,000 tokens/minute at peak
# Should be well under limit, but getting 429s

What’s happening and how would you fix it?

Answer

Likely causes: (1) Token counting is per-request, not per-response. If average request is 1000 tokens but some requests are 5000+ tokens (large contexts), bursts exceed TPM. (2) RPM limits (requests per minute) may be hitting before TPM. Azure has both limits; check both. (3) Other applications sharing the same deployment. If multiple apps use “gpt-4-production,” they share the quota. (4) Retry amplification: 429s cause retries, which cause more 429s. (5) Bursty traffic: 100 requests/minute average might have 50 requests in 10 seconds during bursts.

Fixes: (1) Monitor both TPM and RPM in Azure metrics to identify which limit you’re hitting. (2) Use separate deployments for different applications. (3) Implement client-side rate limiting with token counting before sending. (4) Use exponential backoff with jitter for retries. (5) Consider higher TPM tier or multiple deployments with load balancing. (6) Add request queuing to smooth bursts.

Problem 3. [Staff] A company’s cloud AI costs are growing 40% month-over-month:

Month 1: $50K (launch)
Month 2: $70K (growth)
Month 3: $98K (more growth)
Month 4: $137K (panic)

The AI application usage is only growing 15% month-over-month. What’s causing the cost explosion and how would you investigate?

Answer

Investigation steps: (1) Break down costs by model: Are expensive models being used for simple tasks? (2) Check token ratios: Is output significantly longer than necessary? Verbose prompts or responses waste tokens. (3) Analyze retry rates: Failed requests still cost money. High retry rates multiply costs. (4) Look for runaway loops: Agent systems can spiral, making many LLM calls per user request. (5) Check for development/testing waste: Are developers running expensive experiments on production tiers? (6) Examine caching hit rates: If caching was implemented but not working, costs spike. (7) Audit unused capacity: Provisioned throughput that’s not fully utilized.

Common culprits: (1) Feature creep adding more LLM calls per request (costs grow faster than user growth). (2) Context windows growing as conversation history accumulates. (3) Retry logic without caps. (4) Embedding regeneration instead of caching. (5) Missing input/output limits allowing unbounded token usage.

Immediate fixes: Add per-request cost limits, implement token budgets, route simple queries to cheaper models, add caching, cap context window sizes.

Design Exercises

Exercise 1. [Senior] Design a cloud AI deployment for a financial services company. Requirements: Must be SOC 2 and PCI DSS compliant, needs to process 5M tokens/day with p99 latency <2s, must support both GPT-4 and Claude for different use cases, all data must stay within the company’s AWS environment. What architecture would you propose?

Guidance

Architecture: (1) Primary: AWS Bedrock for Claude (native AWS, data stays in-region). (2) Secondary: Self-hosted API gateway calling Azure OpenAI for GPT-4 (Azure has strong PCI/SOC 2 compliance). (3) VPC setup: Private subnets with no internet egress. VPC endpoints for Bedrock. Private connectivity to Azure via ExpressRoute/Direct Connect peering. (4) Data handling: PII stripped before LLM calls, responses logged to encrypted S3 with audit trails. (5) Latency: 5M tokens/day ≈ 60 tokens/second average. Provisioned throughput on Bedrock for guaranteed latency. Pre-warmed connections. Regional deployment close to users. (6) Compliance: WAF in front of API gateway, request/response logging to SIEM, encryption at rest/transit, quarterly penetration testing.

Cost estimate: Bedrock provisioned ~$15-20K/month, Azure OpenAI ~$10-15K/month (if 30% of traffic), infrastructure ~$5K/month. Total ~$30-40K/month.

Exercise 2. [Staff] Your company is evaluating whether to continue with managed cloud AI services or build a self-hosted inference platform. Current usage: 100M tokens/day across 50 applications, $500K/month on cloud AI. Build a comprehensive analysis comparing the two approaches. Include: total cost of ownership (3 years), operational requirements, risk factors, and recommendation.

Guidance

Self-hosted analysis: (1) Hardware: 100M tokens/day ≈ 1200 tokens/second. For Llama 4 Maverick, need ~8-10 H100 GPUs. Cost: ~$200K upfront or ~$30K/month cloud GPUs. (2) Software: vLLM/TGI, Kubernetes, monitoring, CI/CD. Engineering time: 2-3 full-time engineers minimum. (3) Operational cost: ~$400K/year for 2.5 engineers. (4) Model limitations: Self-hosted = open-source models only. Quality gap vs Claude/GPT-4 for complex tasks.

Managed analysis: (1) Current: $500K/month = $6M/year. (2) Volume discounts: At this scale, negotiate enterprise agreements. Expect 20-40% discount = $3.6-4.8M/year. (3) Zero operational overhead. (4) Model flexibility: Access to best models.

3-year TCO: Self-hosted: $600K hardware + $1.2M/year operations + quality gap risk = ~$4.2M. Managed (negotiated): ~$4M/year = $12M, but includes best models and zero ops burden.

Recommendation: At 100M tokens/day, hybrid makes sense. Self-host for high-volume, latency-insensitive workloads (embeddings, simple classification). Use managed for complex reasoning requiring frontier models. Expected split: 60% self-hosted, 40% managed = ~$6M over 3 years vs $12M all-managed.

Further Reading

Essential

  • AWS Bedrock Documentation — Model access, Knowledge Bases, and Agents. Start with the getting started guide.
  • Azure OpenAI Service Documentation — Deployments, content filtering, and enterprise features.
  • Google Vertex AI Documentation — Gemini models, Vertex AI Search, and multi-modal capabilities.

Deep Dives

  • “Cloud AI Pricing Calculator” (AWS, Azure, GCP) — Build cost models for your expected usage.
  • Cloud Provider Compliance Pages — Understand which certifications apply to AI services specifically.

Case Studies

  • Anthropic Claude on AWS Bedrock — Architecture patterns for production Claude deployments.
  • Enterprise GPT with Azure OpenAI — Microsoft’s reference architectures for enterprise AI.