SocketProtocol
Protocol for real-time WebSocket communication.
Defines the interface for broadcasting messages and sending
targeted messages to specific users. Implementations may use
different backplanes (Redis, NATS, in-memory).
Example:
>>> class NatsSocket:
... async def broadcast(self, topic: str, message: dict) -> None:
... await self.nc.publish(f"ws.{topic}", json.dumps(message))
...
... async def send_to_user(self, user_id: str, message: dict) -> None:
... await self.nc.publish(f"ws.user.{user_id}", json.dumps(message))
Source: socket.py
Methods
broadcast
async def broadcast(self, topic: str, message: dict[str, Any]) -> None
Broadcast a message to all subscribers of a topic.
Args:
topic: The topic/channel to broadcast to (e.g., "doc.updated")
message: The message payload to send
send_to_user
async def send_to_user(self, user_id: str, message: dict[str, Any]) -> None
Send a message to a specific user.
Args:
user_id: The user ID to send to
message: The message payload to send