Building Real-Time Applications with WebSockets and FastAPI
HTTP request-response works until you need real-time updates. Polling is wasteful. Server-sent events are unidirectional. WebSockets give you full-duplex communication — the server pushes data the instant it is available.
FastAPI has first-class WebSocket support built on Starlette. This guide covers how to build production WebSocket features including connection management, rooms, authentication, and scaling beyond a single server.
Problem
Applications need real-time features for:
- Chat and messaging
- Live dashboards and monitoring
- Collaborative editing
- Notifications and alerts
- Real-time data feeds
HTTP polling creates unnecessary load and introduces latency. WebSockets solve both problems.
Basic WebSocket Endpoint
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Echo: {data}")
except WebSocketDisconnect:
pass
The connection stays open. Both sides can send messages at any time.
Connection Manager
Production applications need to track active connections:
from dataclasses import dataclass, field
@dataclass
class ConnectionManager:
connections: dict[str, set[WebSocket]] = field(default_factory=dict)
async def connect(self, room: str, ws: WebSocket):
await ws.accept()
if room not in self.connections:
self.connections[room] = set()
self.connections[room].add(ws)
def disconnect(self, room: str, ws: WebSocket):
self.connections.get(room, set()).discard(ws)
if room in self.connections and not self.connections[room]:
del self.connections[room]
async def broadcast(self, room: str, message: str):
for ws in self.connections.get(room, set()):
try:
await ws.send_text(message)
except Exception:
self.disconnect(room, ws)
manager = ConnectionManager()
Room-Based Chat
import json
@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(ws: WebSocket, room_id: str):
await manager.connect(room_id, ws)
try:
while True:
raw = await ws.receive_text()
message = json.loads(raw)
broadcast_msg = json.dumps({
"user": message.get("user", "anonymous"),
"text": message["text"],
"room": room_id,
"timestamp": datetime.utcnow().isoformat(),
})
await manager.broadcast(room_id, broadcast_msg)
except WebSocketDisconnect:
manager.disconnect(room_id, ws)
await manager.broadcast(
room_id,
json.dumps({"type": "system", "text": "User left"}),
)
Authentication
Authenticate before accepting the connection:
from fastapi import Query, status
@app.websocket("/ws/secure")
async def secure_endpoint(
ws: WebSocket,
token: str = Query(...),
):
user = verify_jwt(token)
if not user:
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
return
await ws.accept()
# Connection is now authenticated
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Hello {user.name}: {data}")
except WebSocketDisconnect:
pass
Pass the token as a query parameter since WebSocket connections cannot set custom headers in browser JavaScript.
Heartbeat and Reconnection
Detect dead connections with ping/pong:
import asyncio
async def heartbeat(ws: WebSocket, interval: int = 30):
"""Send periodic pings to detect dead connections."""
try:
while True:
await asyncio.sleep(interval)
await ws.send_json({"type": "ping"})
except Exception:
pass
@app.websocket("/ws/reliable")
async def reliable_endpoint(ws: WebSocket):
await ws.accept()
ping_task = asyncio.create_task(heartbeat(ws))
try:
while True:
data = await ws.receive_text()
msg = json.loads(data)
if msg.get("type") == "pong":
continue
await ws.send_text(f"Received: {msg}")
except WebSocketDisconnect:
pass
finally:
ping_task.cancel()
Client-side, implement automatic reconnection with exponential backoff:
function connectWebSocket(url: string) {
let retries = 0;
function connect() {
const ws = new WebSocket(url);
ws.onopen = () => { retries = 0; };
ws.onclose = () => {
const delay = Math.min(1000 * 2 ** retries, 30000);
retries++;
setTimeout(connect, delay);
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === "ping") {
ws.send(JSON.stringify({ type: "pong" }));
return;
}
handleMessage(msg);
};
}
connect();
}
Scaling with Redis Pub/Sub
A single server only knows about its own connections. For multiple servers, use Redis as a message bus:
import redis.asyncio as aioredis
redis_client = aioredis.from_url("redis://localhost")
async def publish_to_room(room: str, message: str):
await redis_client.publish(f"room:{room}", message)
async def subscribe_to_room(room: str, ws: WebSocket):
pubsub = redis_client.pubsub()
await pubsub.subscribe(f"room:{room}")
async for msg in pubsub.listen():
if msg["type"] == "message":
await ws.send_text(msg["data"].decode())
Every server subscribes to Redis channels. When a message arrives on any server, Redis distributes it to all servers in the room.
Common Mistakes
Mistake 1: No connection cleanup. Dead connections accumulate and waste memory. Use heartbeats to detect and remove them.
Mistake 2: Blocking the event loop. Database queries inside the WebSocket loop block all connections. Use asyncio.create_task for independent work.
Mistake 3: No message size limits. Large messages can exhaust server memory. Set max_size on the WebSocket configuration.
Takeaways
WebSockets in FastAPI work well for real-time features. The connection manager pattern handles rooms and broadcasting. Authentication happens before accept(). For production, add heartbeats, client-side reconnection, and Redis pub/sub for multi-server scaling.