Building Real-Time Applications with WebSockets and FastAPI

· 12 min read · Backend Development

Build production WebSocket features in FastAPI including rooms, authentication, heartbeats, reconnection, and multi-server scaling with Redis.

Building Real-Time Applications with WebSockets and FastAPI

HTTP request-response works until you need real-time updates. Polling is wasteful. Server-sent events are unidirectional. WebSockets give you full-duplex communication — the server pushes data the instant it is available.

FastAPI has first-class WebSocket support built on Starlette. This guide covers how to build production WebSocket features including connection management, rooms, authentication, and scaling beyond a single server.

Problem

Applications need real-time features for:

  • Chat and messaging
  • Live dashboards and monitoring
  • Collaborative editing
  • Notifications and alerts
  • Real-time data feeds

HTTP polling creates unnecessary load and introduces latency. WebSockets solve both problems.

Basic WebSocket Endpoint

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass

The connection stays open. Both sides can send messages at any time.

Connection Manager

Production applications need to track active connections:

from dataclasses import dataclass, field

@dataclass
class ConnectionManager:
    connections: dict[str, set[WebSocket]] = field(default_factory=dict)

    async def connect(self, room: str, ws: WebSocket):
        await ws.accept()
        if room not in self.connections:
            self.connections[room] = set()
        self.connections[room].add(ws)

    def disconnect(self, room: str, ws: WebSocket):
        self.connections.get(room, set()).discard(ws)
        if room in self.connections and not self.connections[room]:
            del self.connections[room]

    async def broadcast(self, room: str, message: str):
        for ws in self.connections.get(room, set()):
            try:
                await ws.send_text(message)
            except Exception:
                self.disconnect(room, ws)

manager = ConnectionManager()

Room-Based Chat

import json

@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(ws: WebSocket, room_id: str):
    await manager.connect(room_id, ws)
    try:
        while True:
            raw = await ws.receive_text()
            message = json.loads(raw)
            broadcast_msg = json.dumps({
                "user": message.get("user", "anonymous"),
                "text": message["text"],
                "room": room_id,
                "timestamp": datetime.utcnow().isoformat(),
            })
            await manager.broadcast(room_id, broadcast_msg)
    except WebSocketDisconnect:
        manager.disconnect(room_id, ws)
        await manager.broadcast(
            room_id,
            json.dumps({"type": "system", "text": "User left"}),
        )

Authentication

Authenticate before accepting the connection:

from fastapi import Query, status

@app.websocket("/ws/secure")
async def secure_endpoint(
    ws: WebSocket,
    token: str = Query(...),
):
    user = verify_jwt(token)
    if not user:
        await ws.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await ws.accept()
    # Connection is now authenticated
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"Hello {user.name}: {data}")
    except WebSocketDisconnect:
        pass

Pass the token as a query parameter since WebSocket connections cannot set custom headers in browser JavaScript.

Heartbeat and Reconnection

Detect dead connections with ping/pong:

import asyncio

async def heartbeat(ws: WebSocket, interval: int = 30):
    """Send periodic pings to detect dead connections."""
    try:
        while True:
            await asyncio.sleep(interval)
            await ws.send_json({"type": "ping"})
    except Exception:
        pass

@app.websocket("/ws/reliable")
async def reliable_endpoint(ws: WebSocket):
    await ws.accept()
    ping_task = asyncio.create_task(heartbeat(ws))
    try:
        while True:
            data = await ws.receive_text()
            msg = json.loads(data)
            if msg.get("type") == "pong":
                continue
            await ws.send_text(f"Received: {msg}")
    except WebSocketDisconnect:
        pass
    finally:
        ping_task.cancel()

Client-side, implement automatic reconnection with exponential backoff:

function connectWebSocket(url: string) {
  let retries = 0;

  function connect() {
    const ws = new WebSocket(url);

    ws.onopen = () => { retries = 0; };

    ws.onclose = () => {
      const delay = Math.min(1000 * 2 ** retries, 30000);
      retries++;
      setTimeout(connect, delay);
    };

    ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      if (msg.type === "ping") {
        ws.send(JSON.stringify({ type: "pong" }));
        return;
      }
      handleMessage(msg);
    };
  }

  connect();
}

Scaling with Redis Pub/Sub

A single server only knows about its own connections. For multiple servers, use Redis as a message bus:

import redis.asyncio as aioredis

redis_client = aioredis.from_url("redis://localhost")

async def publish_to_room(room: str, message: str):
    await redis_client.publish(f"room:{room}", message)

async def subscribe_to_room(room: str, ws: WebSocket):
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(f"room:{room}")

    async for msg in pubsub.listen():
        if msg["type"] == "message":
            await ws.send_text(msg["data"].decode())

Every server subscribes to Redis channels. When a message arrives on any server, Redis distributes it to all servers in the room.

Common Mistakes

Mistake 1: No connection cleanup. Dead connections accumulate and waste memory. Use heartbeats to detect and remove them.

Mistake 2: Blocking the event loop. Database queries inside the WebSocket loop block all connections. Use asyncio.create_task for independent work.

Mistake 3: No message size limits. Large messages can exhaust server memory. Set max_size on the WebSocket configuration.

Takeaways

WebSockets in FastAPI work well for real-time features. The connection manager pattern handles rooms and broadcasting. Authentication happens before accept(). For production, add heartbeats, client-side reconnection, and Redis pub/sub for multi-server scaling.