FastAPI Performance Optimization - Production-Grade Techniques
FastAPI Performance Optimization - Production-Grade Techniques
1. TL;DR - The Quick Wins
Instant improvements (< 1 hour):
- Switch from
jsontoorjson(20-30% faster serialization) - Enable Uvicorn workers:
--workers 4(4x throughput on multi-core) - Add response caching for read-heavy endpoints (10-100x faster)
- Use connection pooling (5-10x faster database queries)
Medium effort (1-2 days):
- Implement proper async/await patterns (2-5x for I/O-bound)
- Add database query optimization (10-50x for N+1 queries)
- Background tasks for non-critical operations
- Proper dependency injection caching
High impact (1 week+):
- Valkey caching layer with invalidation strategy
- CDN for static assets
- Database read replicas
- Horizontal scaling with load balancer
Real numbers from production (8-core server, 32GB RAM):
- Before optimization: 500 RPS, 150ms p95 latency
- After optimization: 5,000 RPS, 15ms p95 latency
- Cost: Same hardware, 10x throughput
2. JSON Serialization - The Low-Hanging Fruit
Problem: Default JSON is Slow
from fastapi import FastAPI
import json
import time
app = FastAPI()
@app.get("/slow-json")
def slow_endpoint():
data = {
"users": [{"id": i, "name": f"user_{i}"} for i in range(1000)]
}
return data # Uses standard json.dumps internally
Solution: Use orjson
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
# Set default response class globally
app = FastAPI(default_response_class=ORJSONResponse)
@app.get("/fast-json")
def fast_endpoint():
data = {
"users": [{"id": i, "name": f"user_{i}"} for i in range(1000)]
}
return data # Now uses orjson - 2-3x faster!
# Or per-endpoint:
@app.get("/another-fast", response_class=ORJSONResponse)
def another_endpoint():
return {"status": "fast"}
Install:
pip install orjson
orjson is 2-3x faster than the standard library’s json module due to its implementation in Rust and optimized algorithms. It handles datetime objects, UUIDs, and numpy arrays out of the box.
3. Database Optimization - Connection Pooling
Problem: Creating Connections is Expensive
# BAD - Creates new connection per request
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@app.get("/users/{user_id}")
def get_user(user_id: int):
engine = create_engine("postgresql://user:pass@localhost/db")
Session = sessionmaker(bind=engine)
session = Session()
user = session.query(User).filter_by(id=user_id).first()
session.close()
return user
# Each request: TCP handshake + auth + query = 50-100ms overhead!
Solution: Connection Pool with AsyncPG
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool, QueuePool
# Create engine with connection pool
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=20, # Maintain 20 connections
max_overflow=10, # Allow 10 extra under load
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections every hour
)
# Async session maker
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# Dependency for database session
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Use in endpoints
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(User).filter_by(id=user_id)
)
user = result.scalar_one_or_none()
return user
# Result: 5-10x faster database queries!
Pool Configuration Guidelines
For detailed connection pooling configuration, sizing strategies, and monitoring, see the SQLAlchemy pooling configuration guide.
Key takeaway: Each worker process needs its own connection pool. Calculate as (workers * pool_size) <= database_max_connections.
4. Async/Await Patterns - Do It Right
The Cardinal Sin: Blocking in Async
# CATASTROPHIC - Blocks entire event loop!
@app.get("/users")
async def get_users():
# requests.get() is blocking!
response = requests.get("https://api.external.com/users")
return response.json()
# Result: 1 slow request blocks ALL other requests
# Your 5000 RPS API drops to 10 RPS
Solution: Use Async HTTP Clients
import httpx
from typing import List
# Create reusable client
client = httpx.AsyncClient(
timeout=10.0,
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
@app.get("/users")
async def get_users():
response = await client.get("https://api.external.com/users")
return response.json()
@app.on_event("shutdown")
async def shutdown():
await client.aclose()
# Alternative: aiohttp
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
Parallel Async Requests
import asyncio
from typing import List, Dict
@app.get("/dashboard")
async def get_dashboard():
# BAD - Sequential (slow!)
users = await fetch_users()
posts = await fetch_posts()
comments = await fetch_comments()
# GOOD - Parallel (3x faster!)
users, posts, comments = await asyncio.gather(
fetch_users(),
fetch_posts(),
fetch_comments(),
)
return {
"users": users,
"posts": posts,
"comments": comments
}
# Even better: fetch only what you need
@app.get("/dashboard/{user_id}")
async def get_user_dashboard(user_id: int):
# Gather returns results in order
user, user_posts, user_stats = await asyncio.gather(
fetch_user(user_id),
fetch_user_posts(user_id),
fetch_user_stats(user_id),
return_exceptions=True # Don't fail all if one fails
)
# Handle potential exceptions
if isinstance(user, Exception):
raise HTTPException(status_code=500, detail="User fetch failed")
return {
"user": user,
"posts": user_posts if not isinstance(user_posts, Exception) else [],
"stats": user_stats if not isinstance(user_stats, Exception) else {}
}
CPU-Bound Work: Use Thread/Process Pool
from fastapi import BackgroundTasks
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import asyncio
# For CPU-heavy work (image processing, ML inference)
process_pool = ProcessPoolExecutor(max_workers=4)
# For I/O-bound blocking libraries (legacy code)
thread_pool = ThreadPoolExecutor(max_workers=20)
@app.post("/process-image")
async def process_image(file: UploadFile):
contents = await file.read()
# Run CPU-intensive work in process pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
process_pool,
heavy_image_processing, # This function runs in separate process
contents
)
return {"processed": result}
def heavy_image_processing(image_data: bytes) -> dict:
# CPU-intensive operations here
# Runs in separate process, doesn't block event loop
pass
5. Caching Strategies
In-Memory Caching with TTL
from functools import lru_cache
from datetime import datetime, timedelta
from typing import Optional
import time
# Simple LRU cache for pure functions
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
time.sleep(1) # Simulate expensive operation
return n ** 2
@app.get("/compute/{n}")
def compute(n: int):
# First call: 1 second
# Subsequent calls: instant (cached)
return {"result": expensive_computation(n)}
# Cache with TTL (time-to-live)
class CacheWithTTL:
def __init__(self):
self._cache = {}
self._timestamps = {}
def get(self, key: str, ttl_seconds: int = 300) -> Optional[any]:
if key in self._cache:
age = time.time() - self._timestamps[key]
if age < ttl_seconds:
return self._cache[key]
else:
del self._cache[key]
del self._timestamps[key]
return None
def set(self, key: str, value: any):
self._cache[key] = value
self._timestamps[key] = time.time()
cache = CacheWithTTL()
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# Check cache first
cached = cache.get(f"user:{user_id}", ttl_seconds=60)
if cached:
return cached
# Fetch from database
user = await db.get(User, user_id)
# Store in cache
cache.set(f"user:{user_id}", user)
return user
Valkey Caching (Production Pattern)
from fastapi import FastAPI, Depends
from redis import asyncio as aioredis # Works with Valkey (Redis fork)
import json
from typing import Optional
# Valkey connection pool (compatible with redis-py client)
valkey_pool = aioredis.ConnectionPool.from_url(
"redis://localhost:6379", # Valkey uses same protocol
max_connections=10,
decode_responses=True
)
async def get_valkey():
return aioredis.Redis(connection_pool=valkey_pool)
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
valkey: aioredis.Redis = Depends(get_valkey)
):
cache_key = f"user:{user_id}"
# Try cache first
cached = await valkey.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - fetch from database
result = await db.execute(select(User).filter_by(id=user_id))
user = result.scalar_one_or_none()
if user:
# Store in Valkey with 5-minute TTL
await valkey.setex(
cache_key,
300, # 5 minutes
json.dumps(user.to_dict())
)
return user
@app.put("/users/{user_id}")
async def update_user(
user_id: int,
data: UserUpdate,
db: AsyncSession = Depends(get_db),
valkey: aioredis.Redis = Depends(get_valkey)
):
# Update database
user = await db.get(User, user_id)
user.name = data.name
await db.commit()
# Invalidate cache
await valkey.delete(f"user:{user_id}")
return user
Response Caching Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import hashlib
class ResponseCacheMiddleware(BaseHTTPMiddleware):
def __init__(self, app, valkey_client):
super().__init__(app)
self.valkey = valkey_client
async def dispatch(self, request: Request, call_next):
# Only cache GET requests
if request.method != "GET":
return await call_next(request)
# Create cache key from URL and query params
cache_key = f"response:{request.url.path}:{request.url.query}"
# Check cache
cached = await self.valkey.get(cache_key)
if cached:
return Response(
content=cached,
media_type="application/json",
headers={"X-Cache": "HIT"}
)
# Get response
response = await call_next(request)
# Cache successful responses
if response.status_code == 200:
body = b""
async for chunk in response.body_iterator:
body += chunk
await self.valkey.setex(cache_key, 60, body)
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
return response
# Add to app
app.add_middleware(ResponseCacheMiddleware, valkey_client=valkey)
# For proper cache control with CDN integration, see:
# https://earezki.com/fastapi-full-guide/#9-cache-control-headers-leverage-cdns-without-thinking
6. Dependency Injection Optimization
Problem: Computing Dependencies Multiple Times
# BAD - Expensive dependency called multiple times
def get_current_user(token: str = Header(...)):
# Database query + JWT validation = 50ms
return validate_and_fetch_user(token)
@app.get("/profile")
def get_profile(user = Depends(get_current_user)):
return user
@app.get("/settings")
def get_settings(
user = Depends(get_current_user), # Computed again!
# ...
):
return user.settings
Solution: Cached Dependencies
from fastapi import Depends, Request
# Cache dependency result per request
async def get_current_user(
request: Request,
token: str = Header(...)
):
# Check if already computed for this request
if hasattr(request.state, "current_user"):
return request.state.current_user
# Compute once
user = await validate_and_fetch_user(token)
request.state.current_user = user
return user
# Or use use_cache parameter (FastAPI 0.95+)
from functools import lru_cache
@lru_cache()
def get_settings():
# Only computed once
return Settings()
@app.get("/endpoint")
def endpoint(settings: Settings = Depends(get_settings)):
# get_settings() result is cached
pass
# For comprehensive dependency injection patterns and common pitfalls, see:
# https://earezki.com/fastapi-full-guide/#3-dependency-injection-done-right-and-the-5-ways-people-screw-it-up
7. Background Tasks
Don’t Make Users Wait
from fastapi import BackgroundTasks
# BAD - User waits for email to send
@app.post("/register")
async def register(user: UserCreate, db: AsyncSession = Depends(get_db)):
new_user = User(**user.dict())
db.add(new_user)
await db.commit()
# Email takes 2-3 seconds!
await send_welcome_email(new_user.email)
return {"message": "User registered"}
# GOOD - Email sent in background
@app.post("/register")
async def register(
user: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
new_user = User(**user.dict())
db.add(new_user)
await db.commit()
# Returns immediately, email sent after response
background_tasks.add_task(send_welcome_email, new_user.email)
return {"message": "User registered"}
# For longer tasks: Use Celery or RQ
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379') # Valkey-compatible
@celery.task
def process_video(video_id: int):
# Long-running task
pass
@app.post("/upload-video")
async def upload_video(file: UploadFile):
video_id = await save_video(file)
# Queue task for processing
process_video.delay(video_id)
return {"video_id": video_id, "status": "processing"}
8. Query Optimization - N+1 Problem
The N+1 Query Problem
# BAD - N+1 queries!
@app.get("/posts")
async def get_posts(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Post).limit(10))
posts = result.scalars().all()
# For each post, fetch author (11 queries total!)
for post in posts:
author_result = await db.execute(
select(User).filter_by(id=post.author_id)
)
post.author = author_result.scalar_one()
return posts
Solution: Eager Loading
from sqlalchemy.orm import selectinload, joinedload
# GOOD - 2 queries total
@app.get("/posts")
async def get_posts(db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Post)
.options(selectinload(Post.author)) # Eager load relationship
.limit(10)
)
posts = result.scalars().all()
return posts
# Even better - 1 query with JOIN
@app.get("/posts-optimized")
async def get_posts_optimized(db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Post)
.options(joinedload(Post.author)) # Single query with JOIN
.limit(10)
)
posts = result.unique().scalars().all()
return posts
Pagination with Cursor
from typing import Optional
# BAD - OFFSET pagination (slow for large offsets)
@app.get("/posts")
async def get_posts(skip: int = 0, limit: int = 20):
# SELECT * FROM posts OFFSET 1000000 LIMIT 20
# Database must scan and skip 1M rows!
result = await db.execute(
select(Post).offset(skip).limit(limit)
)
return result.scalars().all()
# GOOD - Cursor-based pagination
@app.get("/posts-cursor")
async def get_posts_cursor(
cursor: Optional[int] = None,
limit: int = 20
):
query = select(Post).order_by(Post.id).limit(limit)
if cursor:
# WHERE id > cursor (uses index!)
query = query.filter(Post.id > cursor)
result = await db.execute(query)
posts = result.scalars().all()
next_cursor = posts[-1].id if posts else None
return {
"posts": posts,
"next_cursor": next_cursor
}
9. Uvicorn Configuration
Worker Configuration
# Development - single worker
uvicorn main:app --reload
# Production - multiple workers
uvicorn main:app \
--host 0.0.0.0 \ # Bind to all interfaces
--port 8000 \ # Port to listen on
--workers 4 \ # Number of worker processes (see below)
--loop uvloop \ # Use uvloop (faster event loop)
--http httptools \ # Use httptools parser (faster than h11)
--log-level warning \ # Reduce logging overhead
--access-log \ # Enable access logs
--proxy-headers \ # Trust X-Forwarded-* headers from proxy
--forwarded-allow-ips '*' # Which IPs to trust for forwarded headers
# Worker calculation: (2 * CPU_cores) + 1
# 4-core machine = 9 workers
# 8-core machine = 17 workers
Worker configuration:
- Each worker is a separate process with its own memory space
- More workers = more concurrent requests, but also more memory
- Memory footprint: ~40-100MB per worker (depending on imports)
- Example: 8 workers × 60MB = ~480MB baseline + shared libraries
- Don’t exceed CPU cores by too much (context switching overhead)
uvloop - Should you use it?
Yes, uvloop is production-ready and safe to use:
- 2-4x faster than asyncio’s default event loop
- Based on libuv (same as Node.js)
- Used by major companies in production
- Drop-in replacement, no code changes needed
- Caveat: Linux/macOS only (falls back to default on Windows)
Install: pip install uvloop
Gunicorn with Uvicorn Workers
# gunicorn_config.py
import multiprocessing
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 120
timeout = 30
graceful_timeout = 30
# Logging
accesslog = "logs/access.log"
errorlog = "logs/error.log"
loglevel = "warning"
# Performance
worker_connections = 1000
max_requests = 1000 # Restart worker after 1000 requests
max_requests_jitter = 100 # Add randomness to prevent thundering herd
# Run with gunicorn
gunicorn main:app -c gunicorn_config.py
10. Profiling and Monitoring
Add Request Timing Middleware
import time
from starlette.middleware.base import BaseHTTPMiddleware
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
process_time = time.perf_counter() - start_time
response.headers["X-Process-Time"] = str(process_time)
# Log slow requests
if process_time > 1.0:
print(f"SLOW REQUEST: {request.method} {request.url.path} took {process_time:.2f}s")
return response
app.add_middleware(TimingMiddleware)
Profiling with py-spy
# Install py-spy
pip install py-spy
# Profile running application
py-spy top --pid $(pgrep -f "uvicorn main:app")
# Generate flamegraph
py-spy record -o profile.svg --pid $(pgrep -f "uvicorn main:app")
# Profile specific endpoint
py-spy record -o profile.svg --duration 60 -- python -m uvicorn main:app
# Then hit your endpoint multiple times
Application Performance Monitoring
# Using Prometheus
from prometheus_client import Counter, Histogram, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
# Metrics
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
@app.middleware("http")
async def metrics_middleware(request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start_time
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Conclusion
FastAPI performance optimization is about:
- Eliminating blocking operations - Use async everywhere
- Reducing database round trips - Connection pooling + query optimization
- Caching strategically - Valkey for distributed, in-memory for local
- Leveraging multiple cores - Uvicorn workers with uvloop
- Measuring constantly - Profile, monitor, optimize
Start with the quick wins (orjson, workers, pooling), then profile to find bottlenecks. Don’t optimize blindly—measure first, then optimize what matters.
Most performance problems in FastAPI are from:
- Blocking operations in async code (70%)
- Database N+1 queries (20%)
- Missing caching (10%)
Continue reading
Next article
Python Modules and Imports - Best Practices and Pitfalls
Related Content
FastAPI in Production - Full Guide
The definitive guide to running FastAPI at scale. Real benchmarks, battle-tested patterns.
Hexagonal Architecture with FastAPI: Database, Valkey Cache, Messaging
Code-heavy walkthrough of a document management platform built with Hexagonal Architecture in Python. Includes FastAPI adapters, SQLAlchemy persistence, Valkey caching, and message publishing.
Codexity Part 7: Server-Sent Events and Streaming
Implement production-grade SSE streaming in FastAPI. Handle connection drops, heartbeats, backpressure, error recovery, and the HTTP details that make streaming reliable.