EventBusProtocol
Protocol defining the contract for event bus implementations.
This is the primary port for pub/sub messaging in the hexagonal architecture.
Supports connection lifecycle, publishing, and subscribing with patterns.
Implementations include:
- RedisEventBus: Redis Pub/Sub for simple use cases
- NatsEventBus: NATS for high-performance messaging
- KafkaEventBus: Kafka for durable event streaming
Example usage:
bus: EventBusProtocol = container.get(EventBusProtocol)
await bus.connect()
async def handler(event: Event) -> None:
print(f"Received: {event.type}")
sub_id = await bus.subscribe("doc.created", handler)
await bus.publish("doc.created", event)
await bus.unsubscribe(sub_id)
await bus.disconnect()
Source: event_bus.py
Methods
connect
async def connect(self) -> None
Establish connection to the message broker.
Should be called before publishing or subscribing.
For Redis Pub/Sub this is optional, but required for NATS/Kafka.
disconnect
async def disconnect(self) -> None
Close connection to the message broker.
Should be called during application shutdown.
Cleans up all subscriptions.
is_connected
def is_connected(self) -> bool
Check if connected to the message broker.
Returns:
True if connected and ready to publish/subscribe
publish
async def publish(self, topic: str, event: Event) -> None
Publish an event to a topic.
Args:
topic: The topic/channel to publish to (e.g., "doc.created")
event: The event to publish
subscribe
async def subscribe(self,
topic: str,
handler: EventHandler,
) -> str
Subscribe to events on a specific topic.
Args:
topic: The exact topic to subscribe to
handler: Async function to call when event is received
Returns:
Subscription ID for later unsubscribing
subscribe_pattern
async def subscribe_pattern(self,
pattern: str,
handler: EventHandler,
) -> str
Subscribe to events matching a pattern.
Supports wildcards for flexible subscriptions.
Pattern syntax depends on implementation (e.g., "doc.*" for Redis).
Args:
pattern: Pattern to match topics (e.g., "doc.*", "user.>")
handler: Async function to call when matching event is received
Returns:
Subscription ID for later unsubscribing
unsubscribe
async def unsubscribe(self, subscription_id: str) -> None
Remove a subscription.
Args:
subscription_id: ID returned from subscribe/subscribe_pattern