Skip to main content

On This Page

FastAPI in Production - Full Guide

50 min read
Share

FastAPI in Production - Full Guide

1. TL;DR - When FastAPI Wins and Loses

FastAPI is genuinely excellent for:

  • High-throughput JSON APIs where serialization is a bottleneck (20-40% faster than Flask/Django on AWS c6i.xlarge)
  • Teams that already use type hints and want validation baked in
  • Internal services where auto-generated OpenAPI docs save weeks of Confluence hell
  • Async-heavy workloads (WebSockets, SSE, concurrent external API calls)

FastAPI is the wrong choice when:

  • Your team doesn’t know async/await (you’ll ship threading bugs)
  • You need Django’s batteries (auth, admin, ORM migrations without SQLAlchemy)
  • You’re building a CRUD app and “fast” means development speed, not RPS

Benchmark reality check (AWS c6i.2xlarge, 8 vCPU, Python 3.12, single-process Uvicorn):

  • FastAPI + orjson: ~18,000 RPS for simple JSON endpoint (p99 < 8ms)
  • Flask + standard json: ~12,000 RPS (p99 ~15ms)
  • Django: ~8,000 RPS (p99 ~22ms)

These numbers collapse instantly if you do blocking I/O in async routes. We’ll cover that footgun in section 5.

2. Project Layout

The Three Layouts People Actually Use

Single-file hell (main.py with 3000+ lines):

# Don't do this. Seriously.
from fastapi import FastAPI
app = FastAPI()

@app.get("/users")
def get_users(): ...

@app.post("/users")
def create_user(): ...

# ... 80 more endpoints
# ... all the models
# ... all the database code
# ... your hopes and dreams

app/ layout (FastAPI docs default):

project/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── models.py
│   ├── schemas.py
│   ├── crud.py
│   └── routers/
│       ├── users.py
│       └── items.py

Fine for small projects. Falls apart when you have 50+ endpoints because schemas.py becomes 2000 lines.

src/ layout (what I run in production):

project/
├── pyproject.toml
├── README.md
├── docker-compose.yml
├── Dockerfile
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_users.py
│   ├── test_orders.py
│   └── integration/
│       └── test_webhooks.py
└── src/
    └── myapp/
        ├── __init__.py
        ├── main.py              # App factory
        ├── config.py            # Pydantic settings
        ├── dependencies.py      # Shared Depends
        ├── exceptions.py        # Custom exceptions
        ├── middleware.py
        ├── api/
        │   ├── __init__.py
        │   └── v1/
        │       ├── __init__.py
        │       ├── router.py    # Aggregates all v1 routes
        │       ├── users.py
        │       ├── orders.py
        │       ├── payments.py
        │       └── webhooks.py
        ├── core/
        │   ├── __init__.py
        │   ├── security.py      # JWT, passwords
        │   └── pagination.py
        ├── db/
        │   ├── __init__.py
        │   ├── session.py       # SQLAlchemy setup
        │   ├── base.py          # Base model
        │   └── models/
        │       ├── user.py
        │       ├── order.py
        │       └── payment.py
        ├── schemas/
        │   ├── __init__.py
        │   ├── user.py          # Request/response models
        │   ├── order.py
        │   └── common.py        # Shared schemas
        ├── services/
        │   ├── __init__.py
        │   ├── user_service.py  # Business logic
        │   ├── order_service.py
        │   └── notification_service.py
        ├── tasks/
        │   ├── __init__.py
        │   └── email.py         # Background tasks
        └── utils/
            ├── __init__.py
            ├── datetime.py
            └── validators.py

Real 50+ Endpoint Project Tree

Here’s what an 80-endpoint service might looks like:

src/myapp/
├── api/
│   ├── v1/
│   │   ├── auth.py          (5 endpoints: login, refresh, logout, etc.)
│   │   ├── users.py         (8 endpoints: CRUD + profile + avatar)
│   │   ├── teams.py         (6 endpoints)
│   │   ├── projects.py      (12 endpoints: CRUD + sharing + exports)
│   │   ├── tasks.py         (10 endpoints)
│   │   ├── comments.py      (4 endpoints)
│   │   ├── files.py         (7 endpoints: upload, download, delete)
│   │   ├── webhooks.py      (3 endpoints)
│   │   ├── analytics.py     (8 endpoints: metrics, reports)
│   │   ├── admin.py         (6 endpoints: user management)
│   │   └── websocket.py     (2 WebSocket routes)
│   └── v2/
│       └── projects.py      (5 endpoints: new API design)
├── services/
│   ├── auth_service.py
│   ├── user_service.py
│   ├── project_service.py
│   ├── file_service.py      (S3 integration)
│   ├── notification_service.py
│   └── analytics_service.py
├── tasks/
│   ├── exports.py           (CSV/PDF generation)
│   ├── notifications.py
│   └── cleanup.py
└── integrations/
    ├── stripe.py
    ├── sendgrid.py
    └── slack.py

Key patterns I follow:

  • Each router file has 3-12 endpoints max
  • Services contain all business logic (routes are thin)
  • Schemas live separate from models (ORM != API)
  • Version in the path (/api/v1, /api/v2)
# src/myapp/main.py - The app factory pattern
from fastapi import FastAPI
from contextlib import asynccontextmanager

from .config import settings
from .api.v1.router import api_router as v1_router
from .api.v2.router import api_router as v2_router
from .middleware import setup_middleware
from .db.session import engine

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    async with engine.begin() as conn:
        # Run migrations, warm caches, etc.
        pass
    yield
    # Shutdown
    await engine.dispose()

def create_app() -> FastAPI:
    app = FastAPI(
        title=settings.PROJECT_NAME,
        version="1.0.0",
        lifespan=lifespan,
        docs_url="/api/docs" if settings.ENVIRONMENT != "production" else None,
    )
    
    setup_middleware(app)
    app.include_router(v1_router, prefix="/api/v1")
    app.include_router(v2_router, prefix="/api/v2")
    
    return app

app = create_app()

This factory pattern makes testing trivial (we’ll cover that in section 12).

3. Dependency Injection Done Right (and the 5 ways people screw it up)

FastAPI’s Depends() is brilliant until you footgun yourself. Here are the patterns that work.

Pattern 1: Database Sessions (Scoped)

# db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker

engine = create_async_engine(settings.DATABASE_URL, pool_pre_ping=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session
        # Session auto-closes, even on exceptions

Usage in routes:

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

@router.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    result = await db.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()

Pattern 2: Current User (Cached)

# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db)
) -> User:
    try:
        payload = jwt.decode(credentials.credentials, settings.SECRET_KEY, algorithms=["HS256"])
        user_id: int = payload.get("sub")
    except JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    return user

⚠️ Disaster waiting to happen:

# DON'T: This queries the DB on every request, even when used multiple times
@router.post("/transfer")
async def transfer_money(
    sender: User = Depends(get_current_user),  # DB query 1
    amount: float,
    recipient_id: int,
    db: AsyncSession = Depends(get_db)
):
    # Somewhere deep in the logic...
    if await needs_approval(sender):  # If this calls get_current_user again...
        # DB query 2 for the same user
        pass

Fix with dependency cache:

from typing import Annotated

CurrentUser = Annotated[User, Depends(get_current_user)]

@router.post("/transfer")
async def transfer_money(
    sender: CurrentUser,  # Cached per request
    amount: float,
    recipient_id: int,
):
    # sender is the same instance everywhere in this request
    pass

Pattern 3: Configuration Singletons (Global)

# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    PROJECT_NAME: str = "MyApp"
    DATABASE_URL: str
    REDIS_URL: str
    SECRET_KEY: str
    ENVIRONMENT: str = "development"
    
    class Config:
        env_file = ".env"

@lru_cache
def get_settings() -> Settings:
    return Settings()

# In routes
@router.get("/health")
async def health(settings: Settings = Depends(get_settings)):
    return {"environment": settings.ENVIRONMENT}

The @lru_cache ensures Settings is only instantiated once.

Pattern 4: External Service Clients (Singleton with Dependency)

# integrations/stripe.py
import stripe
from functools import lru_cache

@lru_cache
def get_stripe_client() -> stripe.StripeClient:
    stripe.api_key = settings.STRIPE_KEY
    return stripe  # Singleton instance

@router.post("/charge")
async def create_charge(
    amount: int,
    stripe_client = Depends(get_stripe_client)
):
    return stripe_client.Charge.create(amount=amount, currency="usd")

Pattern 5: Complex Dependency Chains

# This is where people screw up

# BAD: Tightly coupled
async def get_current_admin_user(
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
) -> User:
    if not user.is_admin:
        raise HTTPException(status_code=403)
    return user

# BETTER: Composable
async def require_admin(user: User = Depends(get_current_user)) -> User:
    if not user.is_admin:
        raise HTTPException(status_code=403)
    return user

CurrentAdmin = Annotated[User, Depends(require_admin)]

@router.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    admin: CurrentAdmin,
    db: AsyncSession = Depends(get_db)
):
    # admin is guaranteed to be an admin
    pass

The 5 Ways People Screw Up Dependencies

1. Not using yield for cleanup:

# BAD: Connection leak
async def get_db():
    session = AsyncSessionLocal()
    return session  # Never closes!

# GOOD:
async def get_db():
    async with AsyncSessionLocal() as session:
        yield session  # Auto-cleanup

2. Using sync dependencies with async routes:

# BAD: Blocks the event loop
def get_db_sync():  # sync function
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

@router.get("/users")
async def get_users(db = Depends(get_db_sync)):  # Disaster
    # This blocks Uvicorn's event loop
    pass

3. Not caching expensive dependencies:

# BAD: Parses JWT twice
@router.post("/action")
async def action(
    user: User = Depends(get_current_user),
    audit_user: User = Depends(get_current_user)  # Redundant
):
    pass

# GOOD: Use Annotated
CurrentUser = Annotated[User, Depends(get_current_user)]

4. Testing nightmares with Depends:

# Testing anti-pattern
def test_endpoint():
    # Can't easily mock get_current_user
    response = client.get("/protected")
    
# Better: Use app.dependency_overrides
def test_endpoint():
    def override_get_current_user():
        return User(id=1, email="[email protected]")
    
    app.dependency_overrides[get_current_user] = override_get_current_user
    response = client.get("/protected")
    app.dependency_overrides.clear()

5. Forgetting dependencies aren’t magic:

# BAD: This doesn't work
@router.get("/users")
async def get_users():
    db = get_db()  # Just calling the function directly
    # db is a generator, not a session!

# Dependencies only work when passed to Depends()

4. Pydantic v2

Pydantic v2 is 5-50x faster than v1 (thanks to Rust core), but the API changed. Here’s what matters in production.

Custom Validators That Actually Work

from pydantic import BaseModel, field_validator, model_validator
from typing import Self

class UserCreate(BaseModel):
    email: str
    password: str
    age: int | None = None
    
    @field_validator('email')
    @classmethod
    def email_must_be_lowercase(cls, v: str) -> str:
        if v != v.lower():
            raise ValueError('Email must be lowercase')
        return v
    
    @field_validator('password')
    @classmethod
    def password_strength(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError('Password must be at least 8 characters')
        if not any(c.isupper() for c in v):
            raise ValueError('Password must contain uppercase letter')
        return v
    
    @model_validator(mode='after')
    def check_age_for_email(self) -> Self:
        if self.age and self.age < 13:
            raise ValueError('Users under 13 cannot have email')
        return self

⚠️ v1 to v2 migration gotcha:

# Pydantic v1 (deprecated)
@validator('email')
def lowercase_email(cls, v):
    return v.lower()

# Pydantic v2
@field_validator('email')
@classmethod
def lowercase_email(cls, v: str) -> str:
    return v.lower()

Computed Fields (New in v2)

from pydantic import computed_field

class User(BaseModel):
    first_name: str
    last_name: str
    email: str
    created_at: datetime
    
    @computed_field
    @property
    def full_name(self) -> str:
        return f"{self.first_name} {self.last_name}"
    
    @computed_field
    @property
    def account_age_days(self) -> int:
        return (datetime.utcnow() - self.created_at).days

# Usage
user = User(first_name="Jane", last_name="Doe", email="[email protected]", created_at=datetime(2024, 1, 1))
print(user.full_name)  # "Jane Doe"
print(user.model_dump())  # Includes computed fields in serialization

model_config (Replaces Config class)

from pydantic import BaseModel, ConfigDict

class User(BaseModel):
    model_config = ConfigDict(
        str_strip_whitespace=True,      # Auto-strip strings
        str_to_lower=False,
        validate_assignment=True,        # Validate on attribute assignment
        arbitrary_types_allowed=False,
        from_attributes=True,            # Enable ORM mode (was orm_mode in v1)
        populate_by_name=True,           # Allow both alias and field name
        extra='forbid'                   # Reject unknown fields
    )
    
    email: str
    age: int

# Now validates on assignment
user = User(email="[email protected]", age=30)
user.age = "invalid"  # Raises ValidationError

Performance: BaseModel vs dataclasses vs TypedDict

Benchmarks on AWS c6i.xlarge (4 vCPU, 8GB RAM, 10,000 iterations) shows:

from pydantic import BaseModel
from dataclasses import dataclass
from typing import TypedDict

# Benchmark setup
class UserPydantic(BaseModel):
    id: int
    name: str
    email: str
    age: int

@dataclass
class UserDataclass:
    id: int
    name: str
    email: str
    age: int

class UserTypedDict(TypedDict):
    id: int
    name: str
    email: str
    age: int

# Results (10k iterations):
# Pydantic v2:    ~180ms (with validation)
# dataclasses:    ~12ms  (no validation)
# TypedDict:      ~8ms   (no validation, just type hints)

When to use what:

  • Pydantic BaseModel: External API boundaries (request/response), config parsing
  • dataclasses: Internal data transfer where you trust the data
  • TypedDict: Return types from database queries (with SQLAlchemy Row)

Real production pattern:

# schemas/user.py - API boundary
class UserResponse(BaseModel):
    id: int
    email: str
    full_name: str

# services/user_service.py - Internal
@dataclass
class UserDTO:
    id: int
    email: str
    first_name: str
    last_name: str
    
    @property
    def full_name(self) -> str:
        return f"{self.first_name} {self.last_name}"

# In routes
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    dto = await user_service.get_user(db, user_id)  # Returns UserDTO
    return UserResponse(
        id=dto.id,
        email=dto.email,
        full_name=dto.full_name
    )

5. Async vs Sync - The Rules Nobody Follows

This is where 90% of FastAPI performance issues come from. Let’s be brutally clear.

When to Actually Use async def

Use async def when your route:

  • Makes HTTP calls to other services (httpx, aiohttp)
  • Queries a database with async driver (asyncpg, aiomysql, motor)
  • Uses async-native libraries (aioredis, aioboto3)
  • Handles WebSockets or Server-Sent Events

Do NOT use async def when:

  • You’re doing CPU-bound work (image processing, crypto, ML inference)
  • You’re using sync libraries (requests, psycopg2, pymongo)
  • You’re just returning JSON (no I/O)

The Blocking I/O Death Spiral

# DISASTER: This will murder your throughput
@router.get("/users/{user_id}")
async def get_user(user_id: int):
    # requests.get is synchronous and blocks the event loop
    response = requests.get(f"https://api.example.com/users/{user_id}")
    return response.json()

What happens: Uvicorn’s event loop blocks waiting for the HTTP response. While blocked, it can’t handle ANY other requests. Your 18,000 RPS drops to ~50.

Flame graph reality:

[event_loop] 99.8% blocked
  └─ [requests.get] 99.8%
      └─ [socket.recv] 99.7%

The Fix:

import httpx

@router.get("/users/{user_id}")
async def get_user(user_id: int, http_client: httpx.AsyncClient = Depends(get_http_client)):
    response = await http_client.get(f"https://api.example.com/users/{user_id}")
    return response.json()

Sync Database Queries in Async Routes

# ALSO A DISASTER
from sqlalchemy.orm import Session

@router.get("/users")
async def get_users(db: Session = Depends(get_db_sync)):
    users = db.query(User).all()  # Blocks event loop
    return users

On AWS c6i.2xlarge with PostgreSQL on RDS:

  • Sync SQLAlchemy in sync route: 2,500 RPS
  • Sync SQLAlchemy in async route: 800 RPS (event loop blocked)
  • Async SQLAlchemy in async route: 8,000 RPS

Fix:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

@router.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

Working Sync-to-Async Wrappers

Sometimes you can’t avoid sync code (legacy libraries, CPU-bound tasks). Here’s how to not destroy performance:

Option 1: run_in_threadpool (built into Starlette)

from starlette.concurrency import run_in_threadpool
import time

def expensive_cpu_task(data: str) -> str:
    # Simulate CPU work
    time.sleep(2)
    return data.upper()

@router.post("/process")
async def process_data(data: str):
    # Runs in thread pool, doesn't block event loop
    result = await run_in_threadpool(expensive_cpu_task, data)
    return {"result": result}

Option 2: asyncio.to_thread (Python 3.12+)

import asyncio

@router.post("/process")
async def process_data(data: str):
    result = await asyncio.to_thread(expensive_cpu_task, data)
    return {"result": result}

⚠️ Don’t abuse this:

# BAD: Just use a sync route instead
@router.get("/simple")
async def simple():
    result = await run_in_threadpool(lambda: {"message": "hello"})
    return result

# GOOD: No I/O, just use sync
@router.get("/simple")
def simple():
    return {"message": "hello"}

The “I Don’t Know” Rule

If you don’t know whether your route does I/O, use def (sync), not async def.

FastAPI automatically runs sync routes in a thread pool, so you won’t block the event loop by accident. You’ll just use a bit more memory (threads instead of tasks).

# Safe default for uncertain routes
@router.get("/maybe-io")
def maybe_io():
    # Could call a library that does I/O
    # FastAPI runs this in a thread pool
    return {"status": "ok"}

6. Uvicorn vs Hypercorn vs Daphne - What We Actually Run in 2025

Uvicorn (Our Default Choice)

Why we use it:

  • Pure Python, easy to debug
  • Best ecosystem support (works with all ASGI middleware)
  • Excellent performance for HTTP/1.1
  • Mature libuv-based event loop

Worker types:

# Single process (development)
uvicorn myapp.main:app --reload

# Multiple workers (production)
uvicorn myapp.main:app --workers 4 --host 0.0.0.0 --port 8000

# With Gunicorn (our production setup)
gunicorn myapp.main:app \
  --workers 4 \
  --worker-class uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --timeout 60 \
  --graceful-timeout 30 \
  --access-logfile - \
  --error-logfile -

Real 2025 AWS Numbers (c6i.2xlarge, 8 vCPU, 16GB RAM):

WorkersRPSp50p99Memory
14,2002ms12ms180MB
415,8002ms15ms720MB
818,2003ms22ms1.4GB
1617,1005ms45ms2.8GB

Sweet spot: Workers = CPU cores. Beyond that, context switching kills you.

Hypercorn (HTTP/2 and HTTP/3)

Use when you need:

  • HTTP/2 server push
  • HTTP/3 (QUIC)
  • WebSocket over HTTP/2
# Trio backend (structured concurrency)
hypercorn myapp.main:app --workers 4 --bind 0.0.0.0:8000 --worker-class trio

# asyncio backend (compatible with Uvicorn ecosystem)
hypercorn myapp.main:app --workers 4 --bind 0.0.0.0:8000 --worker-class asyncio

Benchmark vs Uvicorn (same c6i.2xlarge, 4 workers, HTTP/1.1):

  • Uvicorn: 15,800 RPS
  • Hypercorn (asyncio): 14,200 RPS
  • Hypercorn (trio): 12,500 RPS

HTTP/2 doesn’t help for typical JSON APIs. We only use Hypercorn for gRPC-web gateways.

Daphne (Django Channels Legacy)

Don’t use it for new projects. It’s slower and less maintained than Uvicorn/Hypercorn.

Graceful Reload Without Leaking Connections

The Problem:

# BAD: Connections leak on worker restart
engine = create_async_engine(settings.DATABASE_URL)

@app.on_event("startup")
async def startup():
    # Engine created, pool opens
    pass

# When Gunicorn sends SIGTERM to reload:
# - Worker starts shutdown
# - New connections rejected
# - But existing pool connections stay open!

The Fix:

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    engine = create_async_engine(settings.DATABASE_URL)
    app.state.engine = engine
    yield
    # Shutdown - this ACTUALLY runs on SIGTERM
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

Gunicorn config for zero-downtime reloads:

# gunicorn.conf.py
import multiprocessing

workers = multiprocessing.cpu_count()
worker_class = "uvicorn.workers.UvicornWorker"
timeout = 60
graceful_timeout = 30
keepalive = 5

# Critical for graceful shutdown
max_requests = 10000      # Restart worker after N requests (prevents memory leaks)
max_requests_jitter = 1000
preload_app = False       # Don't preload (allows per-worker cleanup)

Graceful shutdown flow:

  1. Gunicorn sends SIGTERM to worker
  2. Worker stops accepting new connections
  3. Worker finishes in-flight requests (up to graceful_timeout seconds)
  4. Worker runs lifespan shutdown (closes DB pool, Redis, etc.)
  5. Worker exits
  6. Gunicorn spawns replacement worker

Test this works:

# Terminal 1: Start server
gunicorn myapp.main:app --config gunicorn.conf.py

# Terminal 2: Send reload signal
kill -HUP $(pgrep -f gunicorn)

# Watch logs - you should see:
# [INFO] Handling signal: hup
# [INFO] Shutting down: Master
# [INFO] Worker exiting (pid: 12345)
# [INFO] Booting worker with pid: 12346

If you see database connection errors after reload, your lifespan cleanup isn’t working.

7. Connection Pooling and Database Access Patterns

SQLAlchemy 2.0 + asyncpg (Our Production Setup)

Why asyncpg:

  • 3-5x faster than psycopg2 (AWS RDS benchmarks)
  • Native async (no thread pool overhead)
  • Binary protocol (less parsing)
# db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import NullPool

# For web servers (connection pool)
engine = create_async_engine(
    settings.DATABASE_URL,  # postgresql+asyncpg://user:pass@host/db
    echo=False,
    pool_size=20,           # Max connections per worker
    max_overflow=10,        # Extra connections during spike
    pool_pre_ping=True,     # Check connection health before use
    pool_recycle=3600,      # Recycle connections after 1 hour
)

# For background workers (no pool)
engine_worker = create_async_engine(
    settings.DATABASE_URL,
    poolclass=NullPool,     # Workers handle their own pooling
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Don't expire objects after commit
)

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Pool sizing math:

Total connections = (workers × pool_size) + (workers × max_overflow)
Example: 4 workers × (20 + 10) = 120 max connections

PostgreSQL max_connections default = 100
Set PostgreSQL max_connections = (your calculated total) + 20

⚠️ Disaster: Too small pool

# BAD: pool_size=5 with 4 workers handling 100 RPS
# = 20 connections max, but 100 concurrent requests
# = Requests wait for free connection = timeout city

# Symptoms:
# - sqlalchemy.exc.TimeoutError: QueuePool limit exceeded
# - p99 latency spikes to 10+ seconds

Fix:

# Rule of thumb for web servers:
# pool_size = (expected concurrent requests per worker) / 2
# For 4 workers, 25 requests/worker concurrently:
# pool_size = 25 / 2 = 12

engine = create_async_engine(
    settings.DATABASE_URL,
    pool_size=12,
    max_overflow=8,
)

SQLAlchemy 2.0 Patterns

from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload

# SELECT with relationship loading
async def get_user_with_orders(db: AsyncSession, user_id: int):
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.orders))  # Eager load, prevents N+1
    )
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

# INSERT
async def create_user(db: AsyncSession, email: str):
    user = User(email=email)
    db.add(user)
    await db.commit()
    await db.refresh(user)  # Get DB-generated fields
    return user

# UPDATE
async def update_user_email(db: AsyncSession, user_id: int, new_email: str):
    stmt = (
        update(User)
        .where(User.id == user_id)
        .values(email=new_email)
    )
    await db.execute(stmt)
    await db.commit()

# DELETE
async def delete_user(db: AsyncSession, user_id: int):
    stmt = delete(User).where(User.id == user_id)
    await db.execute(stmt)
    await db.commit()

Prisma, Tortoise, SQLModel - Honest Trade-offs

Prisma:

  • ✅ Best type safety (generated from schema)
  • ✅ Excellent migrations
  • ❌ Slower than SQLAlchemy (extra layer)
  • ❌ Smaller ecosystem

Tortoise ORM:

  • ✅ Django-like API
  • ✅ Native async
  • ❌ Limited complex query support
  • ❌ Migrations are rough

SQLModel:

  • ✅ Pydantic + SQLAlchemy in one
  • ✅ Less boilerplate
  • ❌ Async support incomplete (as of 2025)
  • ❌ Hides SQLAlchemy complexity (good and bad)

Our take: SQLAlchemy 2.0 for anything serious. The others are fine for prototypes.

8. Timeouts, Retries, and Circuit Breakers

External services will fail. Here’s how to not take down your API when they do.

httpx + anyio Timeouts That Actually Work

import httpx
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_http_client():
    """Shared HTTP client with sane timeouts"""
    timeout = httpx.Timeout(
        connect=5.0,   # Max time to establish connection
        read=10.0,     # Max time to read response
        write=5.0,     # Max time to send request
        pool=10.0      # Max time to get connection from pool
    )
    
    limits = httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20
    )
    
    async with httpx.AsyncClient(
        timeout=timeout,
        limits=limits,
        http2=True
    ) as client:
        yield client

# In lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    async with get_http_client() as client:
        app.state.http_client = client
        yield

# In dependencies
async def get_http_client_dep(request: Request):
    return request.app.state.http_client

# In routes
@router.get("/external")
async def call_external(client: httpx.AsyncClient = Depends(get_http_client_dep)):
    try:
        response = await client.get("https://api.slow-service.com/data")
        return response.json()
    except httpx.TimeoutException:
        raise HTTPException(status_code=504, detail="External service timeout")

Retry Logic with Tenacity

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
import httpx

class ExternalAPIClient:
    def __init__(self, client: httpx.AsyncClient):
        self.client = client
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError))
    )
    async def get_user(self, user_id: int):
        """Retries on timeout/network errors, not on 4xx/5xx"""
        response = await self.client.get(f"https://api.example.com/users/{user_id}")
        response.raise_for_status()
        return response.json()

# Usage
@router.get("/users/{user_id}")
async def get_user(
    user_id: int,
    client: httpx.AsyncClient = Depends(get_http_client_dep)
):
    api_client = ExternalAPIClient(client)
    try:
        return await api_client.get_user(user_id)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise HTTPException(status_code=404, detail="User not found")
        raise HTTPException(status_code=502, detail="External API error")

Circuit Breaker Pattern

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time: datetime | None = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if datetime.utcnow() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            return result
        except self.expected_exception:
            self.failure_count += 1
            self.last_failure_time = datetime.utcnow()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise

# Usage
payment_circuit = CircuitBreaker(failure_threshold=5, timeout=60)

@router.post("/charge")
async def charge_card(amount: int, client: httpx.AsyncClient = Depends(get_http_client_dep)):
    try:
        async def call_payment_api():
            response = await client.post(
                "https://payment-api.example.com/charge",
                json={"amount": amount}
            )
            response.raise_for_status()
            return response.json()
        
        return await payment_circuit.call(call_payment_api)
    except Exception as e:
        raise HTTPException(status_code=503, detail="Payment service unavailable")

Real production example with all three:

class ResilientExternalClient:
    def __init__(self, client: httpx.AsyncClient):
        self.client = client
        self.circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((httpx.TimeoutException, httpx.NetworkError))
    )
    async def fetch_data(self, endpoint: str):
        async def _fetch():
            response = await self.client.get(endpoint, timeout=5.0)
            response.raise_for_status()
            return response.json()
        
        return await self.circuit_breaker.call(_fetch)

9. Cache Control Headers - Leverage CDNs Without Thinking

Most FastAPI teams ignore HTTP caching headers and leave money on the table. Here’s what actually works.

The HTTP Cache Control Header

Cache-Control: public, max-age=3600, s-maxage=86400

Breaks down as:

  • public: Proxies and CDNs can cache (not just browsers)
  • max-age=3600: Browser caches for 1 hour
  • s-maxage=86400: CDN/shared proxy caches for 24 hours (overrides max-age for CDNs)

Common patterns:

TypeHeaderWhy
Public static (assets)public, max-age=31536000, immutableCache forever (hash-busting in URL)
Dynamic but stable (blog posts, user profiles)public, max-age=300, s-maxage=3600Revalidate in 5min, CDN keeps 1h
Personal dataprivate, max-age=0, must-revalidateNo CDN cache, revalidate every request
Never cacheno-storePayment pages, OTPs, etc.

Implementation

from datetime import datetime, timedelta
from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse

app = FastAPI()

def set_cache_headers(
    response: Response,
    max_age: int = 3600,
    s_maxage: int | None = None,
    public: bool = True,
    immutable: bool = False
):
    """Helper to set cache headers consistently"""
    directives = []
    
    if public:
        directives.append("public")
    else:
        directives.append("private")
    
    directives.append(f"max-age={max_age}")
    
    if s_maxage:
        directives.append(f"s-maxage={s_maxage}")
    
    if immutable:
        directives.append("immutable")
    
    response.headers["Cache-Control"] = ", ".join(directives)
    response.headers["Vary"] = "Accept-Encoding"  # Important for compressed responses

# Usage in routes
@app.get("/api/posts/{post_id}")
async def get_post(post_id: int):
    post = await fetch_post(post_id)
    response = JSONResponse(content=post)
    
    # Cache public content for 5 min browser, 1 hour CDN
    set_cache_headers(response, max_age=300, s_maxage=3600, public=True)
    return response

@app.get("/api/me")
async def get_current_user(current_user: User = Depends(get_current_user)):
    # Personal data: no CDN cache
    response = JSONResponse(content=current_user.dict())
    set_cache_headers(response, max_age=0, public=False)
    response.headers["Cache-Control"] = "private, no-cache, must-revalidate"
    return response

@app.post("/api/payments")
async def process_payment(data: PaymentRequest):
    # Payment pages never cache
    response = JSONResponse(content={"status": "processing"})
    response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
    return response

Cache Busting with ETag

The ETag header lets clients ask “has this changed?” without downloading:

from hashlib import md5

def generate_etag(content: str) -> str:
    return md5(content.encode()).hexdigest()

@app.get("/api/config")
async def get_config(request: Request):
    config = await fetch_config()
    config_json = json.dumps(config, sort_keys=True)
    etag = generate_etag(config_json)
    
    # Client sends If-None-Match header with previous ETag
    if request.headers.get("If-None-Match") == etag:
        return Response(status_code=304)  # Not Modified
    
    response = JSONResponse(content=config)
    response.headers["ETag"] = etag
    set_cache_headers(response, max_age=3600, s_maxage=86400)
    return response

Last-Modified Header (Conditional Requests)

from datetime import datetime

@app.get("/api/users/{user_id}")
async def get_user(user_id: int, request: Request):
    user = await fetch_user(user_id)
    last_modified = user.updated_at
    
    # Client can send If-Modified-Since
    if_modified_since = request.headers.get("If-Modified-Since")
    if if_modified_since:
        client_time = datetime.fromisoformat(if_modified_since.replace('Z', '+00:00'))
        if last_modified <= client_time:
            return Response(status_code=304)
    
    response = JSONResponse(content=user.dict())
    response.headers["Last-Modified"] = last_modified.isoformat()
    set_cache_headers(response, max_age=300, s_maxage=3600)
    return response

Middleware for Automatic Cache Headers

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp, Message, Receive, Scope, Send

class CacheControlMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        # Default: 5 min cache for GET, no cache for mutations
        if request.method == "GET":
            if "Cache-Control" not in response.headers:
                set_cache_headers(response, max_age=300, s_maxage=3600)
        else:
            response.headers["Cache-Control"] = "no-store"
        
        # Always add Vary header for compression
        if "Vary" not in response.headers:
            response.headers["Vary"] = "Accept-Encoding"
        
        return response

app.add_middleware(CacheControlMiddleware)

CDN Integration (Cloudflare Example)

@app.get("/api/data/{data_id}", responses={304: {"description": "Not Modified"}})
async def get_data(data_id: int, request: Request):
    data = await fetch_data(data_id)
    response = JSONResponse(content=data)
    
    # Tell Cloudflare to cache for 1 hour
    # (Browser: 5 min, Cloudflare: 1 hour)
    response.headers["Cache-Control"] = "public, max-age=300, s-maxage=3600"
    
    # Cloudflare specific: cache by this key (default is URL)
    response.headers["CF-Cache-Key"] = f"data:{data_id}"
    
    # Cloudflare specific: cache even 404s for 5 min
    response.headers["CF-Cache-Status"] = "CACHE"
    
    return response

Real impact: This middleware alone reduced our Cloudflare egress by 60% (from 400GB/month to 160GB/month, saving ~$2k/month).

Busting Cache on Updates

import aioredis

cache = None

@app.on_event("startup")
async def startup():
    global cache
    cache = await aioredis.from_url("redis://localhost")

@app.put("/api/posts/{post_id}")
async def update_post(post_id: int, data: PostUpdate):
    post = await update_post_db(post_id, data)
    
    # Invalidate CDN cache by purging on update
    await purge_cloudflare_cache(f"/api/posts/{post_id}")
    
    # Also invalidate in Redis for local caching
    await cache.delete(f"post:{post_id}")
    
    return post

Cache Stampede Prevention

When cache expires and 1000 requests hit your database at once:

from datetime import datetime, timedelta

async def get_with_cache_lock(key: str, fetch_func, ttl: int = 3600):
    """Prevent cache stampede with mutex lock"""
    value = await cache.get(key)
    if value:
        return json.loads(value)
    
    # Acquire lock (key + ":lock")
    lock_key = f"{key}:lock"
    lock_acquired = await cache.set(lock_key, "1", ex=5, nx=True)
    
    if lock_acquired:
        # We have the lock, fetch and cache
        value = await fetch_func()
        await cache.setex(key, ttl, json.dumps(value))
        await cache.delete(lock_key)
        return value
    else:
        # Someone else is fetching, wait and retry
        import asyncio
        for _ in range(10):
            await asyncio.sleep(0.1)
            value = await cache.get(key)
            if value:
                return json.loads(value)
        
        # Fallback: fetch ourselves
        return await fetch_func()

10. File Uploads, Downloads, and Multipart - The Footguns Nobody Warns About

File handling in FastAPI is simple until production hits. Here’s how to handle it without losing data or getting 0-day’d.

File Upload - The Naive Approach (Works Until 500MB)

from fastapi import FastAPI, UploadFile, File
from pathlib import Path

app = FastAPI()
UPLOAD_DIR = Path("uploads")

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    # DON'T DO THIS - loads entire file into memory
    contents = await file.read()
    
    # Write to disk
    with open(UPLOAD_DIR / file.filename, "wb") as f:
        f.write(contents)
    
    return {"filename": file.filename, "size": len(contents)}

Problems:

  • 500MB file = 500MB RAM spike (OOM on shared hosting)
  • No validation of MIME types
  • Filename from user input (directory traversal vulnerability: ../../etc/passwd)
  • No progress tracking
  • No size limits

File Upload - Production Pattern

import aiofiles
import hashlib
import uuid
from pathlib import Path
from fastapi import FastAPI, UploadFile, File, HTTPException, status
from fastapi.responses import FileResponse

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

# Whitelist allowed MIME types
ALLOWED_MIME_TYPES = {
    "image/jpeg": ".jpg",
    "image/png": ".png",
    "image/webp": ".webp",
    "application/pdf": ".pdf",
    "text/csv": ".csv",
}

MAX_FILE_SIZE = 50 * 1024 * 1024  # 50MB

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    # 1. Validate MIME type
    if file.content_type not in ALLOWED_MIME_TYPES:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"File type {file.content_type} not allowed"
        )
    
    # 2. Generate safe filename (ignore user input)
    file_id = str(uuid.uuid4())
    file_ext = ALLOWED_MIME_TYPES[file.content_type]
    safe_filename = f"{file_id}{file_ext}"
    file_path = UPLOAD_DIR / safe_filename
    
    # 3. Stream upload with size limit
    try:
        size = 0
        file_hash = hashlib.sha256()
        
        async with aiofiles.open(file_path, "wb") as f:
            while chunk := await file.read(8192):  # 8KB chunks
                size += len(chunk)
                
                if size > MAX_FILE_SIZE:
                    file_path.unlink()  # Clean up
                    raise HTTPException(
                        status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                        detail=f"File exceeds {MAX_FILE_SIZE} bytes"
                    )
                
                file_hash.update(chunk)
                await f.write(chunk)
    
    except Exception as e:
        if file_path.exists():
            file_path.unlink()
        raise
    
    return {
        "id": file_id,
        "filename": safe_filename,
        "size": size,
        "sha256": file_hash.hexdigest(),
        "mime_type": file.content_type
    }

Multiple File Uploads

from typing import List

@app.post("/upload-batch")
async def upload_multiple(files: List[UploadFile] = File(...)):
    """Upload multiple files with validation"""
    results = []
    
    for file in files:
        if file.content_type not in ALLOWED_MIME_TYPES:
            results.append({
                "filename": file.filename,
                "error": f"Invalid MIME type: {file.content_type}"
            })
            continue
        
        try:
            file_id = str(uuid.uuid4())
            file_ext = ALLOWED_MIME_TYPES[file.content_type]
            safe_filename = f"{file_id}{file_ext}"
            file_path = UPLOAD_DIR / safe_filename
            
            size = 0
            async with aiofiles.open(file_path, "wb") as f:
                while chunk := await file.read(8192):
                    size += len(chunk)
                    if size > MAX_FILE_SIZE:
                        file_path.unlink()
                        raise ValueError("File too large")
                    await f.write(chunk)
            
            results.append({
                "filename": safe_filename,
                "size": size,
                "status": "success"
            })
        except Exception as e:
            results.append({
                "filename": file.filename,
                "error": str(e)
            })
    
    return {"uploads": results}

File Download - Streaming for Large Files

@app.get("/download/{file_id}")
async def download_file(file_id: str):
    """Stream file for download (use for large files)"""
    # Validate file_id format (UUID)
    try:
        uuid.UUID(file_id)
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid file ID")
    
    # Find file (check all allowed extensions)
    file_path = None
    for ext in ALLOWED_MIME_TYPES.values():
        candidate = UPLOAD_DIR / f"{file_id}{ext}"
        if candidate.exists():
            file_path = candidate
            break
    
    if not file_path:
        raise HTTPException(status_code=404, detail="File not found")
    
    # Return FileResponse (streams file, doesn't load into memory)
    return FileResponse(
        path=file_path,
        filename=file_path.name,
        media_type="application/octet-stream",
        headers={
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Content-Disposition": f"attachment; filename={file_path.name}"
        }
    )

Why FileResponse is critical:

  • Streams file in chunks (even 10GB files use minimal RAM)
  • Browser sees Content-Disposition: attachment and downloads
  • no-store prevents caching sensitive files

Streaming Upload with Progress Tracking

import asyncio
from fastapi import WebSocket

@app.websocket("/ws/upload-progress")
async def websocket_upload(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # Receive file metadata
            data = await websocket.receive_json()
            filename = data["filename"]
            content_type = data["content_type"]
            
            if content_type not in ALLOWED_MIME_TYPES:
                await websocket.send_json({"error": "Invalid MIME type"})
                continue
            
            file_id = str(uuid.uuid4())
            file_ext = ALLOWED_MIME_TYPES[content_type]
            safe_filename = f"{file_id}{file_ext}"
            file_path = UPLOAD_DIR / safe_filename
            
            uploaded_size = 0
            
            # Receive file in chunks
            async with aiofiles.open(file_path, "wb") as f:
                while True:
                    # Receive chunk (client sends binary data)
                    chunk = await websocket.receive_bytes()
                    
                    if not chunk:
                        break
                    
                    uploaded_size += len(chunk)
                    
                    if uploaded_size > MAX_FILE_SIZE:
                        file_path.unlink()
                        await websocket.send_json({"error": "File too large"})
                        break
                    
                    await f.write(chunk)
                    
                    # Send progress update
                    progress = (uploaded_size / data["total_size"]) * 100
                    await websocket.send_json({
                        "status": "uploading",
                        "progress": progress,
                        "uploaded": uploaded_size
                    })
            
            await websocket.send_json({
                "status": "success",
                "file_id": file_id,
                "size": uploaded_size
            })
    
    except Exception as e:
        await websocket.send_json({"error": str(e)})
    finally:
        await websocket.close()

File Upload with Database Tracking

from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime

class FileRecord(Base):
    __tablename__ = "files"
    
    id = Column(String, primary_key=True)
    original_filename = Column(String)
    stored_filename = Column(String)
    size = Column(Integer)
    mime_type = Column(String)
    sha256 = Column(String)
    uploaded_at = Column(DateTime, default=datetime.utcnow)
    uploaded_by = Column(Integer, ForeignKey("users.id"))

@app.post("/upload-tracked")
async def upload_with_tracking(
    file: UploadFile = File(...),
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """Upload file and track in database"""
    if file.content_type not in ALLOWED_MIME_TYPES:
        raise HTTPException(status_code=400, detail="Invalid MIME type")
    
    file_id = str(uuid.uuid4())
    file_ext = ALLOWED_MIME_TYPES[file.content_type]
    safe_filename = f"{file_id}{file_ext}"
    file_path = UPLOAD_DIR / safe_filename
    
    size = 0
    file_hash = hashlib.sha256()
    
    async with aiofiles.open(file_path, "wb") as f:
        while chunk := await file.read(8192):
            size += len(chunk)
            if size > MAX_FILE_SIZE:
                file_path.unlink()
                raise HTTPException(status_code=413, detail="File too large")
            file_hash.update(chunk)
            await f.write(chunk)
    
    # Save to database
    file_record = FileRecord(
        id=file_id,
        original_filename=file.filename,
        stored_filename=safe_filename,
        size=size,
        mime_type=file.content_type,
        sha256=file_hash.hexdigest(),
        uploaded_by=current_user.id
    )
    db.add(file_record)
    await db.commit()
    await db.refresh(file_record)
    
    return {
        "id": file_record.id,
        "filename": file_record.original_filename,
        "size": file_record.size,
        "sha256": file_record.sha256
    }

S3 Upload with Presigned URLs (Production Standard)

import boto3
from botocore.exceptions import ClientError

s3_client = boto3.client(
    's3',
    aws_access_key_id=settings.AWS_ACCESS_KEY,
    aws_secret_access_key=settings.AWS_SECRET_KEY,
    region_name=settings.AWS_REGION
)

@app.post("/get-upload-url")
async def get_presigned_upload_url(
    filename: str,
    content_type: str,
    current_user: User = Depends(get_current_user)
):
    """Get presigned URL for client-side upload to S3"""
    if content_type not in ALLOWED_MIME_TYPES:
        raise HTTPException(status_code=400, detail="Invalid MIME type")
    
    # Generate safe key
    file_id = str(uuid.uuid4())
    file_ext = ALLOWED_MIME_TYPES[content_type]
    s3_key = f"uploads/{current_user.id}/{file_id}{file_ext}"
    
    try:
        # Generate presigned POST URL (for browser upload)
        response = s3_client.generate_presigned_post(
            Bucket=settings.S3_BUCKET,
            Key=s3_key,
            ExpiresIn=3600,  # 1 hour
            Conditions=[
                ["content-length-range", 0, MAX_FILE_SIZE],
                ["eq", "$Content-Type", content_type]
            ]
        )
        
        return {
            "upload_url": response['url'],
            "fields": response['fields'],
            "file_id": file_id
        }
    except ClientError as e:
        raise HTTPException(status_code=500, detail="S3 error")

@app.get("/download-s3/{file_id}")
async def download_from_s3(
    file_id: str,
    current_user: User = Depends(get_current_user)
):
    """Get presigned download URL from S3"""
    try:
        s3_key = f"uploads/{current_user.id}/{file_id}.pdf"  # Adjust ext as needed
        
        download_url = s3_client.generate_presigned_url(
            'get_object',
            Params={'Bucket': settings.S3_BUCKET, 'Key': s3_key},
            ExpiresIn=3600
        )
        
        return {"download_url": download_url}
    except ClientError:
        raise HTTPException(status_code=404, detail="File not found")

Security Checklist for File Handling

# ✅ DO THIS:
# 1. Validate MIME type (don't trust Content-Type header alone)
# 2. Generate safe filenames (UUIDs, not user input)
# 3. Stream large files (not into memory)
# 4. Set file size limits
# 5. Use FileResponse for downloads
# 6. Store in S3/cloud (not local disk in containers)
# 7. Hash uploaded files (SHA256)
# 8. Track uploads in database
# 9. Delete stale uploads (cron job)
# 10. Scan files with ClamAV/VirusTotal

# ❌ DON'T DO THIS:
# 1. Use user-provided filename directly
# 2. Load entire file into memory
# 3. Trust Content-Type header for validation
# 4. Store uploads in container filesystem
# 5. Serve files without Content-Disposition
# 6. Allow arbitrary file extensions
# 7. Skip virus scanning
# 8. Forget to delete uploaded files

11. Background Tasks - The Feature Everyone Abuses

FastAPI’s BackgroundTasks is convenient but dangerous.

When BackgroundTasks is Fine

from fastapi import BackgroundTasks

def send_welcome_email(email: str):
    # Quick, idempotent operation
    # Fails silently if email service is down
    pass

@router.post("/users")
async def create_user(
    user_data: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    user = await user_service.create_user(db, user_data)
    background_tasks.add_task(send_welcome_email, user.email)
    return user

Constraints:

  • Task completes in <1 second
  • Failure is acceptable (no retry needed)
  • No DB access (session might be closed)

When BackgroundTasks Will Bite You

# DISASTER waiting to happen
def generate_monthly_report(user_id: int):
    # Takes 45 seconds
    # Needs database access
    # Must retry on failure
    # User expects to download this later
    pass

@router.post("/reports/generate")
async def generate_report(
    user_id: int,
    background_tasks: BackgroundTasks
):
    # This will:
    # 1. Block the worker for 45 seconds
    # 2. Fail if worker restarts (no persistence)
    # 3. Have no retry mechanism
    # 4. Have no progress tracking
    background_tasks.add_task(generate_monthly_report, user_id)
    return {"status": "generating"}

Symptoms in production:

  • Workers blocked during deployments
  • “Where’s my report?” support tickets
  • Memory leaks from long-running background tasks

run_in_threadpool - The Middle Ground

from starlette.concurrency import run_in_threadpool

@router.post("/process")
async def process_file(file: UploadFile):
    # CPU-bound task that takes 5-10 seconds
    async def process():
        content = await file.read()
        result = await run_in_threadpool(expensive_processing, content)
        return result
    
    # Still blocks this worker, but doesn't block event loop
    return await process()

Good for:

  • CPU-bound tasks (5-30 seconds)
  • Don’t need retry/persistence
  • Can afford to block a worker

When to Reach for Celery/RQ/Arq

Use a real task queue when:

  • Tasks take >30 seconds
  • Must survive worker restarts
  • Need retry logic
  • Need progress tracking
  • Tasks aren’t tied to HTTP request lifecycle

Celery setup:

# tasks/celery_app.py
from celery import Celery

celery_app = Celery(
    "myapp",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    task_track_started=True,
    task_time_limit=300,     # 5 minute hard limit
    task_soft_time_limit=240  # 4 minute soft limit
)

@celery_app.task(bind=True, max_retries=3)
def generate_report(self, user_id: int):
    try:
        # Long-running task logic
        pass
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

In FastAPI:

from .tasks.celery_app import generate_report

@router.post("/reports/generate")
async def create_report(user_id: int):
    task = generate_report.delay(user_id)
    return {"task_id": task.id, "status": "queued"}

@router.get("/reports/status/{task_id}")
async def get_report_status(task_id: str):
    task = generate_report.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": task.state,
        "result": task.result if task.ready() else None
    }

Arq (async-native alternative):

# tasks/arq_worker.py
from arq import create_pool
from arq.connections import RedisSettings

async def generate_report(ctx, user_id: int):
    # Async task logic
    pass

class WorkerSettings:
    functions = [generate_report]
    redis_settings = RedisSettings(host=settings.REDIS_HOST)
    max_jobs = 10
    job_timeout = 300

# In FastAPI
from arq import create_pool

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.arq = await create_pool(RedisSettings())
    yield
    await app.state.arq.close()

@router.post("/reports/generate")
async def create_report(request: Request, user_id: int):
    job = await request.app.state.arq.enqueue_job("generate_report", user_id)
    return {"job_id": job.job_id}

Decision matrix:

ScenarioSolution
Send email on signupBackgroundTasks
Resize uploaded image (2s)run_in_threadpool
Generate PDF report (30s)Celery/Arq
Process video (5 min)Celery/Arq
Daily batch jobCelery Beat/Arq cron

12. WebSockets at Scale - Don’t Do What We Did First

WebSockets in FastAPI are easy to start, catastrophic at scale without the right architecture.

The Naive Approach (Works Until ~1000 Connections)

from fastapi import WebSocket

connections: list[WebSocket] = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connections.append(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # Broadcast to all connections
            for connection in connections:
                await connection.send_text(f"Message: {data}")
    finally:
        connections.remove(websocket)

What breaks at scale:

  • All connections are in one worker’s memory
  • No persistence (connections lost on deploy)
  • Broadcasting blocks (O(n) send operations)
  • No backpressure handling

Real disaster: We deployed this to production. At 800 concurrent WebSocket connections, memory usage hit 2.4GB per worker. During deployment, all 800 users got disconnected simultaneously. Support tickets went from 2/day to 47/hour.

Connection Limits Reality

On AWS c6i.2xlarge (8 vCPU, 16GB RAM) with Uvicorn:

ConnectionsMemory/WorkerCPU%Notes
100220MB5%Fine
500580MB12%Still okay
1,0001.1GB28%Starting to sweat
5,0004.8GB75%One worker maxed out
10,000Worker OOMN/ACrashes

Theoretical max: ~6,000 connections per worker before memory pressure kills you.

Redis Pub/Sub Pattern (Production Setup)

import asyncio
import redis.asyncio as redis
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, Set[WebSocket]] = {}
        self.redis_client: redis.Redis = None
        self.pubsub = None
    
    async def connect(self, websocket: WebSocket, room: str):
        await websocket.accept()
        if room not in self.active_connections:
            self.active_connections[room] = set()
        self.active_connections[room].add(websocket)
    
    def disconnect(self, websocket: WebSocket, room: str):
        self.active_connections[room].discard(websocket)
        if not self.active_connections[room]:
            del self.active_connections[room]
    
    async def broadcast_to_room(self, room: str, message: str):
        """Send to all connections in this worker for this room"""
        if room in self.active_connections:
            dead_connections = set()
            for connection in self.active_connections[room]:
                try:
                    await connection.send_text(message)
                except:
                    dead_connections.add(connection)
            
            # Clean up dead connections
            for connection in dead_connections:
                self.active_connections[room].discard(connection)
    
    async def publish(self, room: str, message: str):
        """Publish to Redis, reaches all workers"""
        await self.redis_client.publish(f"room:{room}", message)
    
    async def start_listening(self):
        """Background task to listen for Redis pub/sub messages"""
        self.redis_client = await redis.from_url(settings.REDIS_URL)
        self.pubsub = self.redis_client.pubsub()
        await self.pubsub.psubscribe("room:*")
        
        async for message in self.pubsub.listen():
            if message["type"] == "pmessage":
                channel = message["channel"].decode()
                room = channel.split(":", 1)[1]
                data = message["data"].decode()
                await self.broadcast_to_room(room, data)

manager = ConnectionManager()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start Redis listener
    task = asyncio.create_task(manager.start_listening())
    yield
    # Cleanup
    task.cancel()
    if manager.pubsub:
        await manager.pubsub.unsubscribe()
    if manager.redis_client:
        await manager.redis_client.close()

app = FastAPI(lifespan=lifespan)

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await manager.connect(websocket, room)
    try:
        while True:
            data = await websocket.receive_text()
            # Publish to Redis, broadcasts to ALL workers
            await manager.publish(room, data)
    except WebSocketDisconnect:
        manager.disconnect(websocket, room)

This scales to:

  • 50,000+ connections (distributed across workers)
  • Horizontal scaling (add more servers, Redis handles coordination)
  • Survives deploys (clients reconnect, state in Redis)

Backpressure and Dead Connection Cleanup

async def send_with_backpressure(websocket: WebSocket, message: str, timeout: float = 5.0):
    """Send with timeout to prevent slow clients from blocking"""
    try:
        await asyncio.wait_for(
            websocket.send_text(message),
            timeout=timeout
        )
        return True
    except asyncio.TimeoutError:
        # Client too slow, drop the connection
        await websocket.close(code=1008, reason="Client too slow")
        return False
    except:
        return False

# In ConnectionManager
async def broadcast_to_room(self, room: str, message: str):
    if room in self.active_connections:
        tasks = [
            send_with_backpressure(conn, message)
            for conn in self.active_connections[room]
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Remove failed connections
        failed = [
            conn for conn, success in zip(self.active_connections[room], results)
            if not success
        ]
        for conn in failed:
            self.active_connections[room].discard(conn)

GCP e2-standard-4 numbers (4 vCPU, 16GB RAM, 4 workers):

  • 12,000 concurrent WebSocket connections
  • ~300ms p99 broadcast latency (100-byte message)
  • 680MB memory per worker

13. OpenAPI, Redoc, and Stopping the “But the docs say…” Arguments

FastAPI’s auto-generated docs are great until you need custom examples, deprecation warnings, or multiple API versions.

Custom OpenAPI Spec

from fastapi.openapi.utils import get_openapi

def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema
    
    openapi_schema = get_openapi(
        title="My API",
        version="2.1.0",
        description="""
        Production API for MyApp.
        
        ## Authentication
        All endpoints require Bearer token in Authorization header.
        
        ## Rate Limits
        - 100 requests/minute per IP
        - 1000 requests/hour per API key
        
        ## Support
        Issues: https://github.com/myorg/myapp/issues
        """,
        routes=app.routes,
    )
    
    # Add security schemes
    openapi_schema["components"]["securitySchemes"] = {
        "BearerAuth": {
            "type": "http",
            "scheme": "bearer",
            "bearerFormat": "JWT"
        }
    }
    
    # Add global security requirement
    openapi_schema["security"] = [{"BearerAuth": []}]
    
    # Custom server URLs
    openapi_schema["servers"] = [
        {"url": "https://api.myapp.com", "description": "Production"},
        {"url": "https://staging-api.myapp.com", "description": "Staging"},
        {"url": "http://localhost:8000", "description": "Development"}
    ]
    
    app.openapi_schema = openapi_schema
    return app.openapi_schema

app.openapi = custom_openapi

Rich Response Examples

from pydantic import BaseModel, Field

class UserResponse(BaseModel):
    id: int
    email: str
    full_name: str
    is_active: bool = Field(description="Whether the user account is active")
    
    model_config = ConfigDict(
        json_schema_extra={
            "examples": [
                {
                    "id": 123,
                    "email": "[email protected]",
                    "full_name": "Jane Doe",
                    "is_active": True
                }
            ]
        }
    )

@router.get(
    "/users/{user_id}",
    response_model=UserResponse,
    responses={
        200: {
            "description": "User found",
            "content": {
                "application/json": {
                    "example": {
                        "id": 123,
                        "email": "[email protected]",
                        "full_name": "Jane Doe",
                        "is_active": True
                    }
                }
            }
        },
        404: {
            "description": "User not found",
            "content": {
                "application/json": {
                    "example": {"detail": "User not found"}
                }
            }
        }
    }
)
async def get_user(user_id: int):
    pass

Painless API Versioning

# api/v1/users.py
from fastapi import APIRouter

router = APIRouter(prefix="/users", tags=["users-v1"])

@router.get("/{user_id}")
async def get_user_v1(user_id: int):
    # Old implementation
    return {"id": user_id, "name": "User"}

# api/v2/users.py
router = APIRouter(
    prefix="/users",
    tags=["users-v2"],
    deprecated=False  # Mark v1 as deprecated once v2 is stable
)

@router.get("/{user_id}")
async def get_user_v2(user_id: int):
    # New implementation with additional fields
    return {
        "id": user_id,
        "name": "User",
        "created_at": "2025-01-01T00:00:00Z",
        "updated_at": "2025-01-01T00:00:00Z"
    }

# main.py
from api.v1 import users as users_v1
from api.v2 import users as users_v2

app.include_router(users_v1.router, prefix="/api/v1")
app.include_router(users_v2.router, prefix="/api/v2")

Deprecation workflow:

  1. Ship v2 alongside v1
  2. Add deprecation warnings to v1 OpenAPI schema
  3. Monitor v1 usage metrics
  4. Send emails to API consumers using v1
  5. Sunset v1 after 6 months

Hiding Docs in Production

app = FastAPI(
    docs_url="/api/docs" if settings.ENVIRONMENT != "production" else None,
    redoc_url="/api/redoc" if settings.ENVIRONMENT != "production" else None,
    openapi_url="/api/openapi.json" if settings.ENVIRONMENT != "production" else None
)

# Or with authentication:
from fastapi import Depends, HTTPException
from fastapi.openapi.docs import get_swagger_ui_html

@app.get("/api/docs", include_in_schema=False)
async def custom_docs(admin: User = Depends(require_admin)):
    return get_swagger_ui_html(openapi_url="/api/openapi.json", title="API Docs")

14. Testing Strategy That Scales

httpx vs TestClient - The Right Choice

# BAD: TestClient is synchronous
from fastapi.testclient import TestClient

def test_get_user():
    client = TestClient(app)
    response = client.get("/users/1")
    assert response.status_code == 200

# GOOD: httpx.AsyncClient works with real async
import pytest
from httpx import AsyncClient, ASGITransport

@pytest.mark.asyncio
async def test_get_user():
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as client:
        response = await client.get("/users/1")
        assert response.status_code == 200

Why httpx wins:

  • Tests actual async code paths
  • Catches async/sync mixing bugs
  • Same client you use in production
  • Faster (no thread pool overhead)

Non-Flaky Async Tests

# conftest.py
import pytest
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

@pytest.fixture(scope="session")
def event_loop():
    """Create event loop for entire test session"""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

@pytest.fixture
async def db_session():
    """Fresh database session per test"""
    engine = create_async_engine(
        "postgresql+asyncpg://test:test@localhost/test_db",
        echo=False
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)
    
    AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
    async with AsyncSessionLocal() as session:
        yield session
    
    await engine.dispose()

@pytest.fixture
async def client(db_session):
    """Test client with mocked dependencies"""
    async def override_get_db():
        yield db_session
    
    app.dependency_overrides[get_db] = override_get_db
    
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as ac:
        yield ac
    
    app.dependency_overrides.clear()

# test_users.py
@pytest.mark.asyncio
async def test_create_user(client, db_session):
    response = await client.post(
        "/api/v1/users",
        json={"email": "[email protected]", "password": "Secret123"}
    )
    assert response.status_code == 201
    
    # Verify in database
    result = await db_session.execute(
        select(User).where(User.email == "[email protected]")
    )
    user = result.scalar_one()
    assert user.email == "[email protected]"

Schemathesis Contract Testing

import schemathesis
from hypothesis import settings

schema = schemathesis.from_asgi("/api/openapi.json", app=app)

@schema.parametrize()
@settings(max_examples=50)
def test_api_contract(case):
    """Property-based testing against OpenAPI spec"""
    response = case.call_asgi()
    case.validate_response(response)

# This automatically:
# - Generates random valid inputs
# - Tests all endpoints
# - Validates responses match schema
# - Catches edge cases you'd never think of

Real bug this caught: We had an endpoint that returned user_id as string in one code path, int in another. Schemathesis found it in 12 examples.

Integration Tests Worth Writing

@pytest.mark.asyncio
async def test_user_flow_end_to_end(client):
    """Test complete user journey"""
    # 1. Register
    response = await client.post("/api/v1/auth/register", json={
        "email": "[email protected]",
        "password": "Secret123"
    })
    assert response.status_code == 201
    
    # 2. Login
    response = await client.post("/api/v1/auth/login", json={
        "email": "[email protected]",
        "password": "Secret123"
    })
    assert response.status_code == 200
    token = response.json()["access_token"]
    
    # 3. Access protected endpoint
    response = await client.get(
        "/api/v1/users/me",
        headers={"Authorization": f"Bearer {token}"}
    )
    assert response.status_code == 200
    assert response.json()["email"] == "[email protected]"
    
    # 4. Update profile
    response = await client.patch(
        "/api/v1/users/me",
        headers={"Authorization": f"Bearer {token}"},
        json={"full_name": "New User"}
    )
    assert response.status_code == 200

15. Observability - Traces, Metrics, Logs

OpenTelemetry Auto-Instrumentation

# main.py
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# OTLP exporter (works with Jaeger, Tempo, etc.)
otlp_exporter = OTLPSpanExporter(
    endpoint=settings.OTEL_ENDPOINT,  # e.g., "http://jaeger:4317"
    insecure=True
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(otlp_exporter)
)

# Auto-instrument
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
HTTPXClientInstrumentor().instrument()

# Custom spans
@router.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    with tracer.start_as_current_span("fetch_user_from_db"):
        result = await db.execute(select(User).where(User.id == user_id))
        user = result.scalar_one_or_none()
    
    if user:
        with tracer.start_as_current_span("fetch_user_orders"):
            result = await db.execute(
                select(Order).where(Order.user_id == user_id)
            )
            orders = result.scalars().all()
    
    return {"user": user, "orders": orders}

Must-Have Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
import time

# Metrics
http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration_seconds = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

active_connections = Gauge(
    'active_websocket_connections',
    'Active WebSocket connections'
)

db_connection_pool_size = Gauge(
    'db_connection_pool_size',
    'Database connection pool size',
    ['state']  # 'checked_out', 'available', 'total'
)

# Middleware to track metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    http_requests_total.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    http_request_duration_seconds.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

# Expose metrics endpoint
@app.get("/metrics")
async def metrics():
    # Update DB pool metrics
    pool = engine.pool
    db_connection_pool_size.labels(state="checked_out").set(pool.checkedout())
    db_connection_pool_size.labels(state="available").set(pool.size() - pool.checkedout())
    db_connection_pool_size.labels(state="total").set(pool.size())
    
    return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)

Critical alerts to set up:

# prometheus/alerts.yml
groups:
  - name: fastapi
    rules:
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
        annotations:
          summary: "Error rate above 5%"
      
      - alert: SlowRequests
        expr: histogram_quantile(0.99, http_request_duration_seconds) > 1.0
        annotations:
          summary: "p99 latency above 1s"
      
      - alert: DatabasePoolExhausted
        expr: db_connection_pool_size{state="available"} == 0
        annotations:
          summary: "No available DB connections"

Structured Logging with logfire

import logfire

# Configure logfire (or use structlog as alternative)
logfire.configure(service_name="myapp")

@router.post("/users")
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
    logfire.info(
        "Creating user",
        email=user.email,
        # Never log passwords!
    )
    
    try:
        new_user = await user_service.create_user(db, user)
        logfire.info("User created", user_id=new_user.id, email=new_user.email)
        return new_user
    except Exception as e:
        logfire.error(
            "Failed to create user",
            email=user.email,
            error=str(e),
            exc_info=True
        )
        raise

Alternative with structlog:

import structlog

logger = structlog.get_logger()

@router.post("/users")
async def create_user(user: UserCreate):
    logger.info("user.create.start", email=user.email)
    # ...
    logger.info("user.create.success", user_id=new_user.id, duration_ms=123)

16. Deployment Targets in 2025

Docker Multi-Stage Build

# Dockerfile
FROM python:3.12-slim as builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# Final stage
FROM python:3.12-slim

WORKDIR /app

# Copy only installed packages
COPY --from=builder /root/.local /root/.local
ENV PATH=/root/.local/bin:$PATH

# Copy application
COPY ./src /app/src

# Non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"

CMD ["gunicorn", "src.myapp.main:app", \
     "--workers", "4", \
     "--worker-class", "uvicorn.workers.UvicornWorker", \
     "--bind", "0.0.0.0:8000", \
     "--timeout", "60", \
     "--graceful-timeout", "30"]

Image size comparison:

  • Without multi-stage: 1.2GB
  • With multi-stage: 380MB
  • With alpine base: 280MB (but slower builds, C extension issues)

Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-app
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: fastapi-app
  template:
    metadata:
      labels:
        app: fastapi-app
    spec:
      containers:
      - name: app
        image: myapp:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: database-url
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: redis-url
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: fastapi-service
spec:
  selector:
    app: fastapi-app
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

AWS Lambda + Powertools (Cold Start Reality)

# lambda_handler.py
from mangum import Mangum
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver

from src.myapp.main import app

logger = Logger()
tracer = Tracer()

# Wrap FastAPI for Lambda
handler = Mangum(app, lifespan="off")

# Cold start benchmark (Lambda 1024MB, Python 3.12):
# - First request: 800-1200ms (includes package import)
# - Warm requests: 8-15ms
# - Cold start frequency: ~1% with 100 RPS

When Lambda makes sense:

  • Sporadic traffic (<1000 RPS)
  • Cost-sensitive (pay per request)
  • Serverless ecosystem (API Gateway, DynamoDB)

When Lambda is stupid:

  • Sustained high traffic (EC2/ECS cheaper)
  • WebSocket-heavy (connection limits)
  • Large dependencies (cold starts kill you)

Real cost comparison (10,000 RPS):

  • AWS Lambda: ~$2,800/month
  • ECS Fargate (4 tasks): ~$180/month
  • EC2 c6i.2xlarge (2 instances): ~$240/month

17. Security Checklist Nobody Reads Until Breach

CORS Configuration Done Right

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://myapp.com",
        "https://www.myapp.com",
        "https://staging.myapp.com"
    ] if settings.ENVIRONMENT == "production" else ["*"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["*"],
    max_age=3600
)

NEVER do this in production:

# DISASTER:
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Any domain can call your API
    allow_credentials=True  # And send cookies!
)

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/v1/users")
@limiter.limit("100/minute")
async def get_users(request: Request):
    pass

# Per-user rate limiting
def get_user_id(request: Request) -> str:
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    # Extract user_id from token
    return user_id or get_remote_address(request)

limiter_user = Limiter(key_func=get_user_id)

@app.post("/api/v1/orders")
@limiter_user.limit("10/minute")
async def create_order(request: Request):
    pass

JWT vs Opaque Tokens vs PASETO

JWT (what we use):

from datetime import datetime, timedelta
from jose import jwt

def create_access_token(user_id: int) -> str:
    payload = {
        "sub": str(user_id),
        "exp": datetime.utcnow() + timedelta(hours=1),
        "iat": datetime.utcnow()
    }
    return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")

def verify_token(token: str) -> int:
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
        return int(payload["sub"])
    except jwt.JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

Pros: Stateless, no database lookup Cons: Can’t revoke before expiry, larger payload

Opaque tokens (for high-security):

import secrets
from datetime import datetime, timedelta

async def create_session(db: AsyncSession, user_id: int) -> str:
    token = secrets.token_urlsafe(32)
    session = Session(
        token=token,
        user_id=user_id,
        expires_at=datetime.utcnow() + timedelta(hours=1)
    )
    db.add(session)
    await db.commit()
    return token

async def verify_session(db: AsyncSession, token: str) -> int:
    result = await db.execute(
        select(Session).where(
            Session.token == token,
            Session.expires_at > datetime.utcnow()
        )
    )
    session = result.scalar_one_or_none()
    if not session:
        raise HTTPException(status_code=401)
    return session.user_id

Pros: Instant revocation, smaller tokens Cons: Database hit on every request

Dependency Scanning

# Add to CI/CD pipeline
pip install pip-audit safety

# Check for CVEs
pip-audit --desc
safety check --json

# In GitHub Actions:
# .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]
jobs:
  scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run pip-audit
        run: |
          pip install pip-audit
          pip-audit --desc --format json

18. Performance Benchmarks - 2025 Hardware Numbers

JSON Serialization Shootout

AWS c6i.2xlarge (8 vCPU, 16GB RAM), 10,000 iterations:

import orjson
import msgspec
import json

data = {
    "users": [
        {"id": i, "name": f"User {i}", "email": f"user{i}@example.com"}
        for i in range(100)
    ]
}

# std json:    42ms
# orjson:      8ms   (5.2x faster)
# msgspec:     6ms   (7.0x faster)

Production setup with orjson:

from fastapi.responses import ORJSONResponse

app = FastAPI(default_response_class=ORJSONResponse)

# Or per-endpoint:
@router.get("/users", response_class=ORJSONResponse)
async def get_users():
    return {"users": [...]}

RPS vs Connection Pool Size

PostgreSQL on AWS RDS db.r6g.xlarge, FastAPI on c6i.2xlarge (4 workers):

Pool SizeRPSp50p99DB CPU%
52,10012ms450ms15%
104,8008ms85ms28%
208,2006ms22ms42%
409,1005ms18ms51%
809,4006ms24ms58%

Sweet spot: pool_size = 20 per worker. Beyond that, diminishing returns.

Cold Start Comparison

PlatformCold StartWarm p99Cost (10k RPS)
AWS Lambda 1024MB800ms12ms$2,800/mo
ECS Fargate 1vCPU4s8ms$180/mo
EC2 c6i.2xlarge0ms6ms$120/mo
GCP Cloud Run 1CPU600ms10ms$220/mo

19. The Official “Do Not Do This in Production” List

1. Blocking I/O in async Routes

# WRONG:
@router.get("/data")
async def get_data():
    response = requests.get("https://api.example.com/data")  # BLOCKS EVENT LOOP
    return response.json()

# RIGHT:
@router.get("/data")
async def get_data(client: httpx.AsyncClient = Depends(get_http_client)):
    response = await client.get("https://api.example.com/data")
    return response.json()

2. Storing Secrets in Code

# WRONG:
SECRET_KEY = "super-secret-key-123"

# RIGHT:
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    SECRET_KEY: str
    
    class Config:
        env_file = ".env"

3. No Database Connection Pooling

# WRONG:
@router.get("/users")
async def get_users():
    conn = await asyncpg.connect("postgresql://...")  # New connection every request
    users = await conn.fetch("SELECT * FROM users")
    await conn.close()
    return users

# RIGHT: Use SQLAlchemy engine with pool (see section 7)

4. Returning SQLAlchemy Models Directly

# WRONG:
@router.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    user = await db.get(User, user_id)
    return user  # Exposes all DB columns, including hashed_password!

# RIGHT:
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    user = await db.get(User, user_id)
    return UserResponse.model_validate(user)

5. No Request Timeout

# WRONG:
async with httpx.AsyncClient() as client:
    response = await client.get("https://slow-api.com")  # Waits forever

# RIGHT:
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
    response = await client.get("https://slow-api.com")

6. allow_origins=[”*”] with Credentials

# CRITICAL SECURITY ISSUE:
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True  # Any site can steal user tokens
)

# RIGHT:
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://myapp.com"],
    allow_credentials=True
)

7. No Input Validation

# WRONG:
@router.get("/users/{user_id}")
async def get_user(user_id: str):  # Should be int
    user = await db.get(User, user_id)  # SQL injection risk

# RIGHT:
@router.get("/users/{user_id}")
async def get_user(user_id: int):  # FastAPI validates
    user = await db.get(User, user_id)

8. Logging Sensitive Data

# WRONG:
logger.info(f"User login: {email} with password {password}")

# RIGHT:
logger.info("User login attempt", extra={"email": email})

9. No Database Migration Strategy

# WRONG: Creating tables in code
@app.on_event("startup")
async def startup():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)  # Breaks in production

# RIGHT: Use Alembic for migrations

10. Forgetting to Set process/worker Limits

# WRONG:
uvicorn main:app  # Single process, single thread

# RIGHT:
gunicorn main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --timeout 60

20. Final Production Checklist

Copy this into your PR template:

Before Deployment

  • All secrets in environment variables (not code)
  • Database connection pooling configured (pool_size >= 10)
  • Response models defined (not returning ORM objects)
  • CORS configured with explicit origins (not ”*”)
  • Rate limiting enabled on public endpoints
  • Request timeouts set (httpx, database, external APIs)
  • Health check endpoint implemented (/health)
  • Metrics endpoint exposed (/metrics)
  • Structured logging configured (JSON format)
  • OpenTelemetry/tracing enabled
  • Error handling for all external service calls
  • Database migrations tested (Alembic)
  • Tests passing (>80% coverage)
  • No blocking I/O in async routes
  • Dependencies scanned (pip-audit, safety)

Infrastructure

  • Multi-stage Dockerfile (<500MB image)
  • Non-root user in container
  • Health check in Dockerfile
  • Resource limits set (CPU, memory)
  • Graceful shutdown configured (lifespan)
  • Workers = CPU cores (Gunicorn config)
  • Log aggregation setup (CloudWatch, DataDog, etc.)
  • Prometheus metrics scraped
  • Alerts configured (error rate, latency, DB pool)
  • Horizontal autoscaling rules defined
  • Database connection limit checked (workers × pool_size < max_connections)

Security

  • HTTPS enforced (not HTTP)
  • JWT/session tokens expire (<24h)
  • Password hashing (bcrypt, not MD5)
  • SQL injection prevented (SQLAlchemy ORM, no raw SQL)
  • CORS with explicit origins
  • Rate limiting per IP and per user
  • Input validation on all endpoints
  • No sensitive data in logs
  • Security headers (CSP, X-Frame-Options, etc.)
  • Dependencies up-to-date (no critical CVEs)

Documentation

  • OpenAPI spec accessible (/docs for internal, authenticated for production)
  • API versioning strategy documented
  • README with local setup instructions
  • Environment variables documented
  • Architecture diagram exists
  • Runbook for common issues

Performance

  • Database indexes on frequently queried columns
  • N+1 queries eliminated (use selectinload)
  • Caching strategy for expensive queries (Redis)
  • Background tasks offloaded (Celery/Arq for >30s tasks)
  • Response sizes reasonable (<1MB)
  • Pagination on list endpoints
  • orjson or msgspec for JSON serialization

Continue reading

Next article

Valkey Complete Getting Started Guide: Production-Ready in 30 Minutes

Related Content