FastAPI Performance Optimization for High Traffic APIs

· 12 min read · Backend Development

Optimize FastAPI for high traffic with async patterns, connection pooling, caching, serialization, and production deployment configuration.

FastAPI Performance Optimization for High Traffic APIs

FastAPI is one of the fastest Python web frameworks because of its async foundation. But "fast framework" does not automatically mean "fast application." The performance bottlenecks in production APIs are almost never the framework itself — they are in database queries, serialization, blocking calls, and missing caches.

This guide covers the optimizations that actually matter when your FastAPI application moves from handling hundreds of requests per second to thousands.

Problem

Common performance issues in production FastAPI services:

  • Endpoints that work fine locally take 2-5 seconds under load
  • Database connection pools get exhausted during traffic spikes
  • Synchronous libraries block the async event loop
  • Response serialization becomes a bottleneck with large payloads
  • Memory usage grows without bound on long-running instances

Async vs Sync: Getting It Right

The most common FastAPI performance mistake is mixing async and sync code incorrectly.

# BAD: Blocking call inside async endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # This blocks the event loop
    user = db.query(User).filter(User.id == user_id).one()
    return user

# GOOD: Use async database driver
@app.get("/users/{user_id}")
async def get_user(user_id: int, pool: asyncpg.Pool = Depends(get_pool)):
    user = await pool.fetchrow(
        "SELECT * FROM users WHERE id = $1", user_id
    )
    return user

If you must use synchronous libraries, declare the endpoint as a regular function (not async def). FastAPI will run it in a thread pool automatically:

# OK: FastAPI handles threading for sync endpoints
@app.get("/users/{user_id}")
def get_user(user_id: int):
    user = db.query(User).filter(User.id == user_id).one()
    return user

Connection Pooling

Database connections are expensive to create. Pool them:

from contextlib import asynccontextmanager
import asyncpg

pool: asyncpg.Pool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = await asyncpg.create_pool(
        dsn="postgresql://user:pass@localhost/db",
        min_size=10,
        max_size=50,
        max_inactive_connection_lifetime=300,
    )
    yield
    await pool.close()

app = FastAPI(lifespan=lifespan)

Set min_size to your baseline concurrency and max_size to your peak. Monitor pool.get_size() and pool.get_idle_size() to tune these values. The PostgreSQL optimization guide covers the query side of this equation.

Response Serialization

Pydantic V2 is significantly faster than V1, but serialization still matters at scale:

from pydantic import BaseModel

# BAD: Returning ORM objects forces runtime serialization
@app.get("/products")
async def list_products():
    rows = await pool.fetch("SELECT * FROM products LIMIT 100")
    return [dict(row) for row in rows]

# GOOD: Use model_validate for typed, optimized serialization
class Product(BaseModel):
    id: int
    name: str
    price: float

@app.get("/products", response_model=list[Product])
async def list_products():
    rows = await pool.fetch(
        "SELECT id, name, price FROM products LIMIT 100"
    )
    return [Product.model_validate(dict(row)) for row in rows]

Select only the columns you need. Returning SELECT * when the response only uses three fields wastes bandwidth and serialization time.

Caching

Add caching at the right layer:

from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis.asyncio as aioredis

@asynccontextmanager
async def lifespan(app: FastAPI):
    r = aioredis.from_url("redis://localhost")
    FastAPICache.init(RedisBackend(r), prefix="api-cache")
    yield

@app.get("/products/{product_id}")
@cache(expire=60)
async def get_product(product_id: int):
    row = await pool.fetchrow(
        "SELECT * FROM products WHERE id = $1", product_id
    )
    return dict(row)

Cache responses that are read-heavy and change infrequently. Invalidate on writes. Do not cache user-specific data without including the user ID in the cache key.

Middleware Performance

Every middleware runs on every request. Keep the stack minimal:

# BAD: Expensive middleware on all routes
@app.middleware("http")
async def log_everything(request: Request, call_next):
    body = await request.body()  # Reads entire body into memory
    response = await call_next(request)
    logger.info(f"Request body: {body}")
    return response

# GOOD: Lightweight timing middleware
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    elapsed = time.perf_counter() - start
    response.headers["X-Process-Time"] = f"{elapsed:.4f}"
    return response

Background Tasks

Offload work that the client does not need to wait for:

from fastapi import BackgroundTasks

@app.post("/orders")
async def create_order(
    order: OrderCreate,
    background_tasks: BackgroundTasks,
):
    saved = await save_order(order)
    background_tasks.add_task(send_confirmation_email, saved.id)
    background_tasks.add_task(update_inventory, saved.items)
    return saved

For heavier jobs, use a task queue like Celery or ARQ instead of FastAPI's built-in background tasks.

Profiling

Find bottlenecks with structured logging:

import time
import structlog

logger = structlog.get_logger()

@app.middleware("http")
async def profile_requests(request: Request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    elapsed = time.perf_counter() - start

    if elapsed > 1.0:
        logger.warning(
            "slow_request",
            path=request.url.path,
            method=request.method,
            duration_s=round(elapsed, 3),
        )

    return response

Log slow requests separately. When you see a pattern, profile the specific endpoint with cProfile or py-spy.

Deployment Configuration

# gunicorn.conf.py
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 5
graceful_timeout = 30
timeout = 60

Use workers = (2 * CPU_CORES) + 1 as a starting point. Monitor memory per worker — async workers handle more concurrency but can accumulate memory over time.

Common Mistakes

Mistake 1: Using async def with synchronous database drivers. This blocks the event loop and kills throughput. Either use async drivers or use def endpoints.

Mistake 2: No connection pooling. Creating a new database connection per request adds 5-20ms of latency. Always pool.

Mistake 3: Caching without invalidation. Stale data causes bugs that are hard to reproduce. Set explicit TTLs and invalidate on writes.

Mistake 4: Not setting request timeouts. A single slow downstream call can exhaust your worker pool. Set httpx timeouts on all external calls.

Takeaways

FastAPI performance optimization is mostly about avoiding blocking the event loop, managing database connections, and caching intelligently. The framework itself is rarely the bottleneck. Profile first, optimize the measured bottlenecks, and monitor after deployment.