Skip to main content

EventBusProtocol

Protocol defining the contract for event bus implementations.

This is the primary port for pub/sub messaging in the hexagonal architecture.
Supports connection lifecycle, publishing, and subscribing with patterns.

Implementations include:
- RedisEventBus: Redis Pub/Sub for simple use cases
- NatsEventBus: NATS for high-performance messaging
- KafkaEventBus: Kafka for durable event streaming

Example usage:
bus: EventBusProtocol = container.get(EventBusProtocol)
await bus.connect()

async def handler(event: Event) -> None:
print(f"Received: {event.type}")

sub_id = await bus.subscribe("doc.created", handler)
await bus.publish("doc.created", event)
await bus.unsubscribe(sub_id)

await bus.disconnect()

Source: event_bus.py

Methods

connect

async def connect(self) -> None

Establish connection to the message broker.

    Should be called before publishing or subscribing.
For Redis Pub/Sub this is optional, but required for NATS/Kafka.

disconnect

async def disconnect(self) -> None

Close connection to the message broker.

    Should be called during application shutdown.
Cleans up all subscriptions.

is_connected

def is_connected(self) -> bool

Check if connected to the message broker.

    Returns:
True if connected and ready to publish/subscribe

publish

async def publish(self, topic: str, event: Event) -> None

Publish an event to a topic.

    Args:
topic: The topic/channel to publish to (e.g., "doc.created")
event: The event to publish

subscribe

async def subscribe(self,
topic: str,
handler: EventHandler,
) -> str

Subscribe to events on a specific topic.

    Args:
topic: The exact topic to subscribe to
handler: Async function to call when event is received

Returns:
Subscription ID for later unsubscribing

subscribe_pattern

async def subscribe_pattern(self,
pattern: str,
handler: EventHandler,
) -> str

Subscribe to events matching a pattern.

    Supports wildcards for flexible subscriptions.
Pattern syntax depends on implementation (e.g., "doc.*" for Redis).

Args:
pattern: Pattern to match topics (e.g., "doc.*", "user.>")
handler: Async function to call when matching event is received

Returns:
Subscription ID for later unsubscribing

unsubscribe

async def unsubscribe(self, subscription_id: str) -> None

Remove a subscription.

    Args:
subscription_id: ID returned from subscribe/subscribe_pattern