Skip to main content

ReadModelProtocol

Protocol defining the contract for CQRS read model implementations.

Read models are projections of domain events into denormalized,
query-optimized structures. They are eventually consistent with
the write model.

Key concepts:
- **Projector**: Transforms events into read model updates
- **Query**: Fast, optimized reads without business logic
- **Rebuild**: Reconstruct entire read model from event history

Implementations include:
- PostgresReadModel: Read replicas for SQL aggregations
- ClickHouseReadModel: Column-store for analytics
- ElasticReadModel: Full-text search and faceted queries

Example usage:
class InvoiceAnalytics(ReadModelProtocol):
async def project(self, event: Event) -> None:
if event.type == "invoice.created":
await self._update_monthly_totals(event.data)

async def query(self, filters=None, **kwargs) -> list[dict]:
return await self.clickhouse.execute(
"SELECT month, SUM(total) FROM invoices GROUP BY month"
)

async def rebuild(self) -> None:
await self.clickhouse.truncate("invoice_analytics")
async for event in self.event_store.replay("invoice.*"):
await self.project(event)

Source: read_model.py

Methods

project

async def project(self, event: Event) -> None

Process a domain event and update the read model.

    This method is called by the event subscriber (projector)
whenever a relevant event occurs.

Args:
event: The domain event to process (e.g., doc.created)

Note:
Projections should be idempotent - processing the same
event twice should produce the same result.

query

async def query(self,
filters: dict[str, Any] | None = None,
order_by: list[str] | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]

Query the read model with optional filtering and pagination.

    Read model queries are optimized for fast retrieval and
aggregation. They may not reflect the latest writes
(eventual consistency).

Args:
filters: Optional key-value filters to apply
order_by: List of fields to sort by (prefix with - for desc)
limit: Maximum number of results (default: 100)
offset: Number of records to skip for pagination

Returns:
List of dictionaries representing query results

Example:
results = await read_model.query(
filters={"status": "paid", "year": 2024},
order_by=["-total", "customer"],
limit=50
)

rebuild

async def rebuild(self) -> None

Rebuild the entire read model from event history.

    This method should:
1. Clear the current read model data
2. Replay all relevant events from the event store
3. Re-project each event to rebuild state

Use when:
- Projection logic has changed
- Read model became corrupted
- Initial setup from existing events

Note:
This can be a long-running operation for large event stores.
Consider running as a background job.