Real-Time Communication with FastAPI WebSockets

· 12 min read · Backend Development

Build real-time features with FastAPI WebSockets including chat rooms, connection management, heartbeats, and scaling patterns.

Real-Time Communication with FastAPI WebSockets

HTTP is request-response. The client asks, the server answers. For real-time features — chat, notifications, live dashboards — the server needs to push data to the client without being asked. WebSockets provide a persistent bidirectional connection.

Basic WebSocket Endpoint

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

This accepts a connection, echoes messages back, and handles disconnection cleanly.

Connection Manager

For multiple clients (chat rooms, notifications):

from typing import Dict, Set

class ConnectionManager:
    def __init__(self):
        self._connections: Dict[str, Set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, room: str):
        await websocket.accept()
        if room not in self._connections:
            self._connections[room] = set()
        self._connections[room].add(websocket)

    def disconnect(self, websocket: WebSocket, room: str):
        if room in self._connections:
            self._connections[room].discard(websocket)
            if not self._connections[room]:
                del self._connections[room]

    async def broadcast(self, room: str, message: dict):
        if room not in self._connections:
            return
        dead_connections = set()
        for connection in self._connections[room]:
            try:
                await connection.send_json(message)
            except Exception:
                dead_connections.add(connection)
        for conn in dead_connections:
            self._connections[room].discard(conn)

manager = ConnectionManager()

Chat Room Implementation

@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(
    websocket: WebSocket,
    room_id: str,
    token: str = Query(None),
):
    # Authenticate
    user = await verify_ws_token(token)
    if not user:
        await websocket.close(code=4001, reason="Unauthorized")
        return

    await manager.connect(websocket, room_id)

    try:
        # Announce join
        await manager.broadcast(room_id, {
            "type": "system",
            "message": f"{user.name} joined the room",
        })

        while True:
            data = await websocket.receive_json()

            if data.get("type") == "message":
                await manager.broadcast(room_id, {
                    "type": "message",
                    "user": user.name,
                    "content": data["content"],
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                })

    except WebSocketDisconnect:
        manager.disconnect(websocket, room_id)
        await manager.broadcast(room_id, {
            "type": "system",
            "message": f"{user.name} left the room",
        })

Client-Side Connection

function useWebSocket(roomId: string) {
  const [messages, setMessages] = useState<Message[]>([]);
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    const token = getAuthToken();
    const ws = new WebSocket(
      `wss://api.example.com/ws/chat/${roomId}?token=${token}`
    );

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setMessages(prev => [...prev, data]);
    };

    ws.onclose = (event) => {
      if (event.code !== 1000) {
        // Reconnect on unexpected closure
        setTimeout(() => connectWs(), 3000);
      }
    };

    wsRef.current = ws;
    return () => ws.close();
  }, [roomId]);

  const sendMessage = (content: string) => {
    wsRef.current?.send(JSON.stringify({
      type: "message",
      content,
    }));
  };

  return { messages, sendMessage };
}

Heartbeat / Ping-Pong

WebSocket connections can silently die (network change, proxy timeout). Implement heartbeats:

import asyncio

@app.websocket("/ws/live/{room_id}")
async def live_endpoint(websocket: WebSocket, room_id: str):
    await manager.connect(websocket, room_id)

    async def heartbeat():
        while True:
            try:
                await asyncio.sleep(30)
                await websocket.send_json({"type": "ping"})
            except Exception:
                break

    heartbeat_task = asyncio.create_task(heartbeat())

    try:
        while True:
            data = await websocket.receive_json()
            if data.get("type") == "pong":
                continue  # Heartbeat response
            # Handle other messages...
    except WebSocketDisconnect:
        heartbeat_task.cancel()
        manager.disconnect(websocket, room_id)

Scaling Beyond One Server

The ConnectionManager above stores connections in memory. If you have multiple server instances, a message broadcast on server A does not reach clients on server B.

Solutions:

  • Redis Pub/Sub: Each server subscribes to a Redis channel. Broadcasts publish to Redis, and all servers receive and forward to their local clients.
  • Supabase Realtime: Use Supabase's built-in Realtime feature which handles multi-server broadcasting.

Takeaways

WebSockets are the right tool for real-time features where the server needs to push data. The ConnectionManager pattern handles multiple rooms and clients. Implement heartbeats for connection reliability and use Redis Pub/Sub when scaling beyond a single server.