Real-Time Communication with FastAPI WebSockets
HTTP is request-response. The client asks, the server answers. For real-time features — chat, notifications, live dashboards — the server needs to push data to the client without being asked. WebSockets provide a persistent bidirectional connection.
Basic WebSocket Endpoint
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("Client disconnected")
This accepts a connection, echoes messages back, and handles disconnection cleanly.
Connection Manager
For multiple clients (chat rooms, notifications):
from typing import Dict, Set
class ConnectionManager:
def __init__(self):
self._connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
if room not in self._connections:
self._connections[room] = set()
self._connections[room].add(websocket)
def disconnect(self, websocket: WebSocket, room: str):
if room in self._connections:
self._connections[room].discard(websocket)
if not self._connections[room]:
del self._connections[room]
async def broadcast(self, room: str, message: dict):
if room not in self._connections:
return
dead_connections = set()
for connection in self._connections[room]:
try:
await connection.send_json(message)
except Exception:
dead_connections.add(connection)
for conn in dead_connections:
self._connections[room].discard(conn)
manager = ConnectionManager()
Chat Room Implementation
@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(
websocket: WebSocket,
room_id: str,
token: str = Query(None),
):
# Authenticate
user = await verify_ws_token(token)
if not user:
await websocket.close(code=4001, reason="Unauthorized")
return
await manager.connect(websocket, room_id)
try:
# Announce join
await manager.broadcast(room_id, {
"type": "system",
"message": f"{user.name} joined the room",
})
while True:
data = await websocket.receive_json()
if data.get("type") == "message":
await manager.broadcast(room_id, {
"type": "message",
"user": user.name,
"content": data["content"],
"timestamp": datetime.now(timezone.utc).isoformat(),
})
except WebSocketDisconnect:
manager.disconnect(websocket, room_id)
await manager.broadcast(room_id, {
"type": "system",
"message": f"{user.name} left the room",
})
Client-Side Connection
function useWebSocket(roomId: string) {
const [messages, setMessages] = useState<Message[]>([]);
const wsRef = useRef<WebSocket | null>(null);
useEffect(() => {
const token = getAuthToken();
const ws = new WebSocket(
`wss://api.example.com/ws/chat/${roomId}?token=${token}`
);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setMessages(prev => [...prev, data]);
};
ws.onclose = (event) => {
if (event.code !== 1000) {
// Reconnect on unexpected closure
setTimeout(() => connectWs(), 3000);
}
};
wsRef.current = ws;
return () => ws.close();
}, [roomId]);
const sendMessage = (content: string) => {
wsRef.current?.send(JSON.stringify({
type: "message",
content,
}));
};
return { messages, sendMessage };
}
Heartbeat / Ping-Pong
WebSocket connections can silently die (network change, proxy timeout). Implement heartbeats:
import asyncio
@app.websocket("/ws/live/{room_id}")
async def live_endpoint(websocket: WebSocket, room_id: str):
await manager.connect(websocket, room_id)
async def heartbeat():
while True:
try:
await asyncio.sleep(30)
await websocket.send_json({"type": "ping"})
except Exception:
break
heartbeat_task = asyncio.create_task(heartbeat())
try:
while True:
data = await websocket.receive_json()
if data.get("type") == "pong":
continue # Heartbeat response
# Handle other messages...
except WebSocketDisconnect:
heartbeat_task.cancel()
manager.disconnect(websocket, room_id)
Scaling Beyond One Server
The ConnectionManager above stores connections in memory. If you have multiple server instances, a message broadcast on server A does not reach clients on server B.
Solutions:
- Redis Pub/Sub: Each server subscribes to a Redis channel. Broadcasts publish to Redis, and all servers receive and forward to their local clients.
- Supabase Realtime: Use Supabase's built-in Realtime feature which handles multi-server broadcasting.
Takeaways
WebSockets are the right tool for real-time features where the server needs to push data. The ConnectionManager pattern handles multiple rooms and clients. Implement heartbeats for connection reliability and use Redis Pub/Sub when scaling beyond a single server.