Skip to main content

On This Page

FastAPI Performance Optimization - Production-Grade Techniques

13 min read
Share

FastAPI Performance Optimization - Production-Grade Techniques

1. TL;DR - The Quick Wins

Instant improvements (< 1 hour):

  • Switch from json to orjson (20-30% faster serialization)
  • Enable Uvicorn workers: --workers 4 (4x throughput on multi-core)
  • Add response caching for read-heavy endpoints (10-100x faster)
  • Use connection pooling (5-10x faster database queries)

Medium effort (1-2 days):

  • Implement proper async/await patterns (2-5x for I/O-bound)
  • Add database query optimization (10-50x for N+1 queries)
  • Background tasks for non-critical operations
  • Proper dependency injection caching

High impact (1 week+):

  • Valkey caching layer with invalidation strategy
  • CDN for static assets
  • Database read replicas
  • Horizontal scaling with load balancer

Real numbers from production (8-core server, 32GB RAM):

  • Before optimization: 500 RPS, 150ms p95 latency
  • After optimization: 5,000 RPS, 15ms p95 latency
  • Cost: Same hardware, 10x throughput

2. JSON Serialization - The Low-Hanging Fruit

Problem: Default JSON is Slow

from fastapi import FastAPI
import json
import time

app = FastAPI()

@app.get("/slow-json")
def slow_endpoint():
    data = {
        "users": [{"id": i, "name": f"user_{i}"} for i in range(1000)]
    }
    return data  # Uses standard json.dumps internally

Solution: Use orjson

from fastapi import FastAPI
from fastapi.responses import ORJSONResponse

# Set default response class globally
app = FastAPI(default_response_class=ORJSONResponse)

@app.get("/fast-json")
def fast_endpoint():
    data = {
        "users": [{"id": i, "name": f"user_{i}"} for i in range(1000)]
    }
    return data  # Now uses orjson - 2-3x faster!

# Or per-endpoint:
@app.get("/another-fast", response_class=ORJSONResponse)
def another_endpoint():
    return {"status": "fast"}

Install:

pip install orjson

orjson is 2-3x faster than the standard library’s json module due to its implementation in Rust and optimized algorithms. It handles datetime objects, UUIDs, and numpy arrays out of the box.

3. Database Optimization - Connection Pooling

Problem: Creating Connections is Expensive

# BAD - Creates new connection per request
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

@app.get("/users/{user_id}")
def get_user(user_id: int):
    engine = create_engine("postgresql://user:pass@localhost/db")
    Session = sessionmaker(bind=engine)
    session = Session()
    
    user = session.query(User).filter_by(id=user_id).first()
    session.close()
    return user

# Each request: TCP handshake + auth + query = 50-100ms overhead!

Solution: Connection Pool with AsyncPG

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool, QueuePool

# Create engine with connection pool
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"

engine = create_async_engine(
    DATABASE_URL,
    echo=False,
    pool_size=20,          # Maintain 20 connections
    max_overflow=10,       # Allow 10 extra under load
    pool_pre_ping=True,    # Verify connections before use
    pool_recycle=3600,     # Recycle connections every hour
)

# Async session maker
AsyncSessionLocal = sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

# Dependency for database session
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

# Use in endpoints
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(User).filter_by(id=user_id)
    )
    user = result.scalar_one_or_none()
    return user

# Result: 5-10x faster database queries!

Pool Configuration Guidelines

For detailed connection pooling configuration, sizing strategies, and monitoring, see the SQLAlchemy pooling configuration guide.

Key takeaway: Each worker process needs its own connection pool. Calculate as (workers * pool_size) <= database_max_connections.

4. Async/Await Patterns - Do It Right

The Cardinal Sin: Blocking in Async

# CATASTROPHIC - Blocks entire event loop!
@app.get("/users")
async def get_users():
    # requests.get() is blocking!
    response = requests.get("https://api.external.com/users")
    return response.json()

# Result: 1 slow request blocks ALL other requests
# Your 5000 RPS API drops to 10 RPS

Solution: Use Async HTTP Clients

import httpx
from typing import List

# Create reusable client
client = httpx.AsyncClient(
    timeout=10.0,
    limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)

@app.get("/users")
async def get_users():
    response = await client.get("https://api.external.com/users")
    return response.json()

@app.on_event("shutdown")
async def shutdown():
    await client.aclose()

# Alternative: aiohttp
import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.json()

Parallel Async Requests

import asyncio
from typing import List, Dict

@app.get("/dashboard")
async def get_dashboard():
    # BAD - Sequential (slow!)
    users = await fetch_users()
    posts = await fetch_posts()
    comments = await fetch_comments()
    
    # GOOD - Parallel (3x faster!)
    users, posts, comments = await asyncio.gather(
        fetch_users(),
        fetch_posts(),
        fetch_comments(),
    )
    
    return {
        "users": users,
        "posts": posts,
        "comments": comments
    }

# Even better: fetch only what you need
@app.get("/dashboard/{user_id}")
async def get_user_dashboard(user_id: int):
    # Gather returns results in order
    user, user_posts, user_stats = await asyncio.gather(
        fetch_user(user_id),
        fetch_user_posts(user_id),
        fetch_user_stats(user_id),
        return_exceptions=True  # Don't fail all if one fails
    )
    
    # Handle potential exceptions
    if isinstance(user, Exception):
        raise HTTPException(status_code=500, detail="User fetch failed")
    
    return {
        "user": user,
        "posts": user_posts if not isinstance(user_posts, Exception) else [],
        "stats": user_stats if not isinstance(user_stats, Exception) else {}
    }

CPU-Bound Work: Use Thread/Process Pool

from fastapi import BackgroundTasks
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import asyncio

# For CPU-heavy work (image processing, ML inference)
process_pool = ProcessPoolExecutor(max_workers=4)

# For I/O-bound blocking libraries (legacy code)
thread_pool = ThreadPoolExecutor(max_workers=20)

@app.post("/process-image")
async def process_image(file: UploadFile):
    contents = await file.read()
    
    # Run CPU-intensive work in process pool
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        process_pool,
        heavy_image_processing,  # This function runs in separate process
        contents
    )
    
    return {"processed": result}

def heavy_image_processing(image_data: bytes) -> dict:
    # CPU-intensive operations here
    # Runs in separate process, doesn't block event loop
    pass

5. Caching Strategies

In-Memory Caching with TTL

from functools import lru_cache
from datetime import datetime, timedelta
from typing import Optional
import time

# Simple LRU cache for pure functions
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
    time.sleep(1)  # Simulate expensive operation
    return n ** 2

@app.get("/compute/{n}")
def compute(n: int):
    # First call: 1 second
    # Subsequent calls: instant (cached)
    return {"result": expensive_computation(n)}

# Cache with TTL (time-to-live)
class CacheWithTTL:
    def __init__(self):
        self._cache = {}
        self._timestamps = {}
    
    def get(self, key: str, ttl_seconds: int = 300) -> Optional[any]:
        if key in self._cache:
            age = time.time() - self._timestamps[key]
            if age < ttl_seconds:
                return self._cache[key]
            else:
                del self._cache[key]
                del self._timestamps[key]
        return None
    
    def set(self, key: str, value: any):
        self._cache[key] = value
        self._timestamps[key] = time.time()

cache = CacheWithTTL()

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    # Check cache first
    cached = cache.get(f"user:{user_id}", ttl_seconds=60)
    if cached:
        return cached
    
    # Fetch from database
    user = await db.get(User, user_id)
    
    # Store in cache
    cache.set(f"user:{user_id}", user)
    return user

Valkey Caching (Production Pattern)

from fastapi import FastAPI, Depends
from redis import asyncio as aioredis  # Works with Valkey (Redis fork)
import json
from typing import Optional

# Valkey connection pool (compatible with redis-py client)
valkey_pool = aioredis.ConnectionPool.from_url(
    "redis://localhost:6379",  # Valkey uses same protocol
    max_connections=10,
    decode_responses=True
)

async def get_valkey():
    return aioredis.Redis(connection_pool=valkey_pool)

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db),
    valkey: aioredis.Redis = Depends(get_valkey)
):
    cache_key = f"user:{user_id}"
    
    # Try cache first
    cached = await valkey.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Cache miss - fetch from database
    result = await db.execute(select(User).filter_by(id=user_id))
    user = result.scalar_one_or_none()
    
    if user:
        # Store in Valkey with 5-minute TTL
        await valkey.setex(
            cache_key,
            300,  # 5 minutes
            json.dumps(user.to_dict())
        )
    
    return user

@app.put("/users/{user_id}")
async def update_user(
    user_id: int,
    data: UserUpdate,
    db: AsyncSession = Depends(get_db),
    valkey: aioredis.Redis = Depends(get_valkey)
):
    # Update database
    user = await db.get(User, user_id)
    user.name = data.name
    await db.commit()
    
    # Invalidate cache
    await valkey.delete(f"user:{user_id}")
    
    return user

Response Caching Middleware

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import hashlib

class ResponseCacheMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, valkey_client):
        super().__init__(app)
        self.valkey = valkey_client
    
    async def dispatch(self, request: Request, call_next):
        # Only cache GET requests
        if request.method != "GET":
            return await call_next(request)
        
        # Create cache key from URL and query params
        cache_key = f"response:{request.url.path}:{request.url.query}"
        
        # Check cache
        cached = await self.valkey.get(cache_key)
        if cached:
            return Response(
                content=cached,
                media_type="application/json",
                headers={"X-Cache": "HIT"}
            )
        
        # Get response
        response = await call_next(request)
        
        # Cache successful responses
        if response.status_code == 200:
            body = b""
            async for chunk in response.body_iterator:
                body += chunk
            
            await self.valkey.setex(cache_key, 60, body)
            
            return Response(
                content=body,
                status_code=response.status_code,
                headers=dict(response.headers),
                media_type=response.media_type
            )
        
        return response

# Add to app
app.add_middleware(ResponseCacheMiddleware, valkey_client=valkey)

# For proper cache control with CDN integration, see:
# https://earezki.com/fastapi-full-guide/#9-cache-control-headers-leverage-cdns-without-thinking

6. Dependency Injection Optimization

Problem: Computing Dependencies Multiple Times

# BAD - Expensive dependency called multiple times
def get_current_user(token: str = Header(...)):
    # Database query + JWT validation = 50ms
    return validate_and_fetch_user(token)

@app.get("/profile")
def get_profile(user = Depends(get_current_user)):
    return user

@app.get("/settings")
def get_settings(
    user = Depends(get_current_user),  # Computed again!
    # ...
):
    return user.settings

Solution: Cached Dependencies

from fastapi import Depends, Request

# Cache dependency result per request
async def get_current_user(
    request: Request,
    token: str = Header(...)
):
    # Check if already computed for this request
    if hasattr(request.state, "current_user"):
        return request.state.current_user
    
    # Compute once
    user = await validate_and_fetch_user(token)
    request.state.current_user = user
    return user

# Or use use_cache parameter (FastAPI 0.95+)
from functools import lru_cache

@lru_cache()
def get_settings():
    # Only computed once
    return Settings()

@app.get("/endpoint")
def endpoint(settings: Settings = Depends(get_settings)):
    # get_settings() result is cached
    pass

# For comprehensive dependency injection patterns and common pitfalls, see:
# https://earezki.com/fastapi-full-guide/#3-dependency-injection-done-right-and-the-5-ways-people-screw-it-up

7. Background Tasks

Don’t Make Users Wait

from fastapi import BackgroundTasks

# BAD - User waits for email to send
@app.post("/register")
async def register(user: UserCreate, db: AsyncSession = Depends(get_db)):
    new_user = User(**user.dict())
    db.add(new_user)
    await db.commit()
    
    # Email takes 2-3 seconds!
    await send_welcome_email(new_user.email)
    
    return {"message": "User registered"}

# GOOD - Email sent in background
@app.post("/register")
async def register(
    user: UserCreate,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
):
    new_user = User(**user.dict())
    db.add(new_user)
    await db.commit()
    
    # Returns immediately, email sent after response
    background_tasks.add_task(send_welcome_email, new_user.email)
    
    return {"message": "User registered"}

# For longer tasks: Use Celery or RQ
from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379')  # Valkey-compatible

@celery.task
def process_video(video_id: int):
    # Long-running task
    pass

@app.post("/upload-video")
async def upload_video(file: UploadFile):
    video_id = await save_video(file)
    
    # Queue task for processing
    process_video.delay(video_id)
    
    return {"video_id": video_id, "status": "processing"}

8. Query Optimization - N+1 Problem

The N+1 Query Problem

# BAD - N+1 queries!
@app.get("/posts")
async def get_posts(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(Post).limit(10))
    posts = result.scalars().all()
    
    # For each post, fetch author (11 queries total!)
    for post in posts:
        author_result = await db.execute(
            select(User).filter_by(id=post.author_id)
        )
        post.author = author_result.scalar_one()
    
    return posts

Solution: Eager Loading

from sqlalchemy.orm import selectinload, joinedload

# GOOD - 2 queries total
@app.get("/posts")
async def get_posts(db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(Post)
        .options(selectinload(Post.author))  # Eager load relationship
        .limit(10)
    )
    posts = result.scalars().all()
    return posts

# Even better - 1 query with JOIN
@app.get("/posts-optimized")
async def get_posts_optimized(db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(Post)
        .options(joinedload(Post.author))  # Single query with JOIN
        .limit(10)
    )
    posts = result.unique().scalars().all()
    return posts

Pagination with Cursor

from typing import Optional

# BAD - OFFSET pagination (slow for large offsets)
@app.get("/posts")
async def get_posts(skip: int = 0, limit: int = 20):
    # SELECT * FROM posts OFFSET 1000000 LIMIT 20
    # Database must scan and skip 1M rows!
    result = await db.execute(
        select(Post).offset(skip).limit(limit)
    )
    return result.scalars().all()

# GOOD - Cursor-based pagination
@app.get("/posts-cursor")
async def get_posts_cursor(
    cursor: Optional[int] = None,
    limit: int = 20
):
    query = select(Post).order_by(Post.id).limit(limit)
    
    if cursor:
        # WHERE id > cursor (uses index!)
        query = query.filter(Post.id > cursor)
    
    result = await db.execute(query)
    posts = result.scalars().all()
    
    next_cursor = posts[-1].id if posts else None
    
    return {
        "posts": posts,
        "next_cursor": next_cursor
    }

9. Uvicorn Configuration

Worker Configuration

# Development - single worker
uvicorn main:app --reload

# Production - multiple workers
uvicorn main:app \
    --host 0.0.0.0 \              # Bind to all interfaces
    --port 8000 \                  # Port to listen on
    --workers 4 \                  # Number of worker processes (see below)
    --loop uvloop \                # Use uvloop (faster event loop)
    --http httptools \             # Use httptools parser (faster than h11)
    --log-level warning \          # Reduce logging overhead
    --access-log \                 # Enable access logs
    --proxy-headers \              # Trust X-Forwarded-* headers from proxy
    --forwarded-allow-ips '*'      # Which IPs to trust for forwarded headers

# Worker calculation: (2 * CPU_cores) + 1
# 4-core machine = 9 workers
# 8-core machine = 17 workers

Worker configuration:

  • Each worker is a separate process with its own memory space
  • More workers = more concurrent requests, but also more memory
  • Memory footprint: ~40-100MB per worker (depending on imports)
  • Example: 8 workers × 60MB = ~480MB baseline + shared libraries
  • Don’t exceed CPU cores by too much (context switching overhead)

uvloop - Should you use it?

Yes, uvloop is production-ready and safe to use:

  • 2-4x faster than asyncio’s default event loop
  • Based on libuv (same as Node.js)
  • Used by major companies in production
  • Drop-in replacement, no code changes needed
  • Caveat: Linux/macOS only (falls back to default on Windows)

Install: pip install uvloop

Gunicorn with Uvicorn Workers

# gunicorn_config.py
import multiprocessing

workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 120
timeout = 30
graceful_timeout = 30

# Logging
accesslog = "logs/access.log"
errorlog = "logs/error.log"
loglevel = "warning"

# Performance
worker_connections = 1000
max_requests = 1000  # Restart worker after 1000 requests
max_requests_jitter = 100  # Add randomness to prevent thundering herd
# Run with gunicorn
gunicorn main:app -c gunicorn_config.py

10. Profiling and Monitoring

Add Request Timing Middleware

import time
from starlette.middleware.base import BaseHTTPMiddleware

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start_time = time.perf_counter()
        
        response = await call_next(request)
        
        process_time = time.perf_counter() - start_time
        response.headers["X-Process-Time"] = str(process_time)
        
        # Log slow requests
        if process_time > 1.0:
            print(f"SLOW REQUEST: {request.method} {request.url.path} took {process_time:.2f}s")
        
        return response

app.add_middleware(TimingMiddleware)

Profiling with py-spy

# Install py-spy
pip install py-spy

# Profile running application
py-spy top --pid $(pgrep -f "uvicorn main:app")

# Generate flamegraph
py-spy record -o profile.svg --pid $(pgrep -f "uvicorn main:app")

# Profile specific endpoint
py-spy record -o profile.svg --duration 60 -- python -m uvicorn main:app
# Then hit your endpoint multiple times

Application Performance Monitoring

# Using Prometheus
from prometheus_client import Counter, Histogram, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST

# Metrics
request_count = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

@app.middleware("http")
async def metrics_middleware(request, call_next):
    start_time = time.perf_counter()
    
    response = await call_next(request)
    
    duration = time.perf_counter() - start_time
    
    request_count.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    request_duration.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

@app.get("/metrics")
def metrics():
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

Conclusion

FastAPI performance optimization is about:

  1. Eliminating blocking operations - Use async everywhere
  2. Reducing database round trips - Connection pooling + query optimization
  3. Caching strategically - Valkey for distributed, in-memory for local
  4. Leveraging multiple cores - Uvicorn workers with uvloop
  5. Measuring constantly - Profile, monitor, optimize

Start with the quick wins (orjson, workers, pooling), then profile to find bottlenecks. Don’t optimize blindly—measure first, then optimize what matters.

Most performance problems in FastAPI are from:

  • Blocking operations in async code (70%)
  • Database N+1 queries (20%)
  • Missing caching (10%)

Continue reading

Next article

Python Modules and Imports - Best Practices and Pitfalls

Related Content