ReadModelProtocol
Protocol defining the contract for CQRS read model implementations.
Read models are projections of domain events into denormalized,
query-optimized structures. They are eventually consistent with
the write model.
Key concepts:
- **Projector**: Transforms events into read model updates
- **Query**: Fast, optimized reads without business logic
- **Rebuild**: Reconstruct entire read model from event history
Implementations include:
- PostgresReadModel: Read replicas for SQL aggregations
- ClickHouseReadModel: Column-store for analytics
- ElasticReadModel: Full-text search and faceted queries
Example usage:
class InvoiceAnalytics(ReadModelProtocol):
async def project(self, event: Event) -> None:
if event.type == "invoice.created":
await self._update_monthly_totals(event.data)
async def query(self, filters=None, **kwargs) -> list[dict]:
return await self.clickhouse.execute(
"SELECT month, SUM(total) FROM invoices GROUP BY month"
)
async def rebuild(self) -> None:
await self.clickhouse.truncate("invoice_analytics")
async for event in self.event_store.replay("invoice.*"):
await self.project(event)
Source: read_model.py
Methods
project
async def project(self, event: Event) -> None
Process a domain event and update the read model.
This method is called by the event subscriber (projector)
whenever a relevant event occurs.
Args:
event: The domain event to process (e.g., doc.created)
Note:
Projections should be idempotent - processing the same
event twice should produce the same result.
query
async def query(self,
filters: dict[str, Any] | None = None,
order_by: list[str] | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]
Query the read model with optional filtering and pagination.
Read model queries are optimized for fast retrieval and
aggregation. They may not reflect the latest writes
(eventual consistency).
Args:
filters: Optional key-value filters to apply
order_by: List of fields to sort by (prefix with - for desc)
limit: Maximum number of results (default: 100)
offset: Number of records to skip for pagination
Returns:
List of dictionaries representing query results
Example:
results = await read_model.query(
filters={"status": "paid", "year": 2024},
order_by=["-total", "customer"],
limit=50
)
rebuild
async def rebuild(self) -> None
Rebuild the entire read model from event history.
This method should:
1. Clear the current read model data
2. Replay all relevant events from the event store
3. Re-project each event to rebuild state
Use when:
- Projection logic has changed
- Read model became corrupted
- Initial setup from existing events
Note:
This can be a long-running operation for large event stores.
Consider running as a background job.