FastAPI Performance Optimization for High Traffic APIs
FastAPI is one of the fastest Python web frameworks because of its async foundation. But "fast framework" does not automatically mean "fast application." The performance bottlenecks in production APIs are almost never the framework itself — they are in database queries, serialization, blocking calls, and missing caches.
This guide covers the optimizations that actually matter when your FastAPI application moves from handling hundreds of requests per second to thousands.
Problem
Common performance issues in production FastAPI services:
- Endpoints that work fine locally take 2-5 seconds under load
- Database connection pools get exhausted during traffic spikes
- Synchronous libraries block the async event loop
- Response serialization becomes a bottleneck with large payloads
- Memory usage grows without bound on long-running instances
Async vs Sync: Getting It Right
The most common FastAPI performance mistake is mixing async and sync code incorrectly.
# BAD: Blocking call inside async endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# This blocks the event loop
user = db.query(User).filter(User.id == user_id).one()
return user
# GOOD: Use async database driver
@app.get("/users/{user_id}")
async def get_user(user_id: int, pool: asyncpg.Pool = Depends(get_pool)):
user = await pool.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return user
If you must use synchronous libraries, declare the endpoint as a regular function (not async def). FastAPI will run it in a thread pool automatically:
# OK: FastAPI handles threading for sync endpoints
@app.get("/users/{user_id}")
def get_user(user_id: int):
user = db.query(User).filter(User.id == user_id).one()
return user
Connection Pooling
Database connections are expensive to create. Pool them:
from contextlib import asynccontextmanager
import asyncpg
pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/db",
min_size=10,
max_size=50,
max_inactive_connection_lifetime=300,
)
yield
await pool.close()
app = FastAPI(lifespan=lifespan)
Set min_size to your baseline concurrency and max_size to your peak. Monitor pool.get_size() and pool.get_idle_size() to tune these values. The PostgreSQL optimization guide covers the query side of this equation.
Response Serialization
Pydantic V2 is significantly faster than V1, but serialization still matters at scale:
from pydantic import BaseModel
# BAD: Returning ORM objects forces runtime serialization
@app.get("/products")
async def list_products():
rows = await pool.fetch("SELECT * FROM products LIMIT 100")
return [dict(row) for row in rows]
# GOOD: Use model_validate for typed, optimized serialization
class Product(BaseModel):
id: int
name: str
price: float
@app.get("/products", response_model=list[Product])
async def list_products():
rows = await pool.fetch(
"SELECT id, name, price FROM products LIMIT 100"
)
return [Product.model_validate(dict(row)) for row in rows]
Select only the columns you need. Returning SELECT * when the response only uses three fields wastes bandwidth and serialization time.
Caching
Add caching at the right layer:
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis.asyncio as aioredis
@asynccontextmanager
async def lifespan(app: FastAPI):
r = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(r), prefix="api-cache")
yield
@app.get("/products/{product_id}")
@cache(expire=60)
async def get_product(product_id: int):
row = await pool.fetchrow(
"SELECT * FROM products WHERE id = $1", product_id
)
return dict(row)
Cache responses that are read-heavy and change infrequently. Invalidate on writes. Do not cache user-specific data without including the user ID in the cache key.
Middleware Performance
Every middleware runs on every request. Keep the stack minimal:
# BAD: Expensive middleware on all routes
@app.middleware("http")
async def log_everything(request: Request, call_next):
body = await request.body() # Reads entire body into memory
response = await call_next(request)
logger.info(f"Request body: {body}")
return response
# GOOD: Lightweight timing middleware
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
response.headers["X-Process-Time"] = f"{elapsed:.4f}"
return response
Background Tasks
Offload work that the client does not need to wait for:
from fastapi import BackgroundTasks
@app.post("/orders")
async def create_order(
order: OrderCreate,
background_tasks: BackgroundTasks,
):
saved = await save_order(order)
background_tasks.add_task(send_confirmation_email, saved.id)
background_tasks.add_task(update_inventory, saved.items)
return saved
For heavier jobs, use a task queue like Celery or ARQ instead of FastAPI's built-in background tasks.
Profiling
Find bottlenecks with structured logging:
import time
import structlog
logger = structlog.get_logger()
@app.middleware("http")
async def profile_requests(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
if elapsed > 1.0:
logger.warning(
"slow_request",
path=request.url.path,
method=request.method,
duration_s=round(elapsed, 3),
)
return response
Log slow requests separately. When you see a pattern, profile the specific endpoint with cProfile or py-spy.
Deployment Configuration
# gunicorn.conf.py
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 5
graceful_timeout = 30
timeout = 60
Use workers = (2 * CPU_CORES) + 1 as a starting point. Monitor memory per worker — async workers handle more concurrency but can accumulate memory over time.
Common Mistakes
Mistake 1: Using async def with synchronous database drivers. This blocks the event loop and kills throughput. Either use async drivers or use def endpoints.
Mistake 2: No connection pooling. Creating a new database connection per request adds 5-20ms of latency. Always pool.
Mistake 3: Caching without invalidation. Stale data causes bugs that are hard to reproduce. Set explicit TTLs and invalidate on writes.
Mistake 4: Not setting request timeouts. A single slow downstream call can exhaust your worker pool. Set httpx timeouts on all external calls.
Takeaways
FastAPI performance optimization is mostly about avoiding blocking the event loop, managing database connections, and caching intelligently. The framework itself is rarely the bottleneck. Profile first, optimize the measured bottlenecks, and monitor after deployment.