Skip to main content

SocketProtocol

Protocol for real-time WebSocket communication.

Defines the interface for broadcasting messages and sending
targeted messages to specific users. Implementations may use
different backplanes (Redis, NATS, in-memory).

Example:
>>> class NatsSocket:
... async def broadcast(self, topic: str, message: dict) -> None:
... await self.nc.publish(f"ws.{topic}", json.dumps(message))
...
... async def send_to_user(self, user_id: str, message: dict) -> None:
... await self.nc.publish(f"ws.user.{user_id}", json.dumps(message))

Source: socket.py

Methods

broadcast

async def broadcast(self, topic: str, message: dict[str, Any]) -> None

Broadcast a message to all subscribers of a topic.

    Args:
topic: The topic/channel to broadcast to (e.g., "doc.updated")
message: The message payload to send

send_to_user

async def send_to_user(self, user_id: str, message: dict[str, Any]) -> None

Send a message to a specific user.

    Args:
user_id: The user ID to send to
message: The message payload to send