Framework-M Protocol Reference
This document provides a comprehensive reference for all protocols (interfaces) in Framework-M Core. Protocols define the contracts that adapters must implement, enabling the MX pattern for database and service customization.
Table of Contents
- Repository Protocol
- Schema Mapper Protocol
- Bootstrap Protocol
- Base DocType Protocol
- Permission Protocol
- Event Bus Protocol
- Cache Protocol
Repository Protocol
Location: framework_m_core.interfaces.repository
The primary interface for data persistence operations. All repository adapters must implement this protocol.
Interface Definition
from typing import Protocol, TypeVar, Sequence
from uuid import UUID
from framework_m_core.interfaces.repository import (
FilterSpec,
OrderSpec,
PaginatedResult
)
T = TypeVar("T")
class RepositoryProtocol[T](Protocol):
"""Protocol for data repositories."""
async def get(self, id: UUID) -> T | None:
"""Retrieve entity by ID."""
...
async def save(self, entity: T, version: int | None = None) -> T:
"""Save entity (insert or update)."""
...
async def delete(self, id: UUID) -> None:
"""Delete entity by ID."""
...
async def exists(self, id: UUID) -> bool:
"""Check if entity exists."""
...
async def count(self, filters: list[FilterSpec] | None = None) -> int:
"""Count entities matching filters."""
...
async def list(
self,
filters: list[FilterSpec] | None = None,
order_by: list[OrderSpec] | None = None,
limit: int = 20,
offset: int = 0,
) -> PaginatedResult[T]:
"""List entities with pagination."""
...
async def bulk_save(self, entities: Sequence[T]) -> Sequence[T]:
"""Save multiple entities."""
...
Supporting Types
FilterSpec
from framework_m_core.interfaces.repository import FilterOperator
class FilterSpec(BaseModel):
"""Specification for filtering queries."""
field: str
operator: FilterOperator # EQ, NE, LT, LTE, GT, GTE, IN, NOT_IN, LIKE, IS_NULL
value: Any
Example:
filters = [
FilterSpec(field="status", operator=FilterOperator.EQ, value="active"),
FilterSpec(field="age", operator=FilterOperator.GTE, value=18)
]
OrderSpec
from framework_m_core.interfaces.repository import OrderDirection
class OrderSpec(BaseModel):
"""Specification for ordering results."""
field: str
direction: OrderDirection = OrderDirection.ASC # ASC or DESC
Example:
order_by = [
OrderSpec(field="created_at", direction=OrderDirection.DESC)
]
PaginatedResult
class PaginatedResult[T](BaseModel):
"""Container for paginated results."""
items: Sequence[T]
total: int
limit: int
offset: int
@property
def has_more(self) -> bool:
"""Check if more items exist."""
return self.offset + len(self.items) < self.total
Implementations
| Package | Class | Database |
|---|---|---|
framework-m-standard | GenericRepository | PostgreSQL/MySQL (SQLAlchemy) |
framework-mx-mongo | MongoGenericRepository | MongoDB (Motor) |
Usage Example
from framework_m_core.interfaces.repository import (
RepositoryProtocol,
FilterSpec,
FilterOperator
)
async def get_active_users(repo: RepositoryProtocol[User]) -> list[User]:
"""Get all active users - works with any repository implementation."""
filters = [
FilterSpec(field="status", operator=FilterOperator.EQ, value="active")
]
result = await repo.list(filters=filters, limit=100)
return list(result.items)
Schema Mapper Protocol
Location: framework_m_core.interfaces.schema_mapper
Defines the interface for mapping Pydantic models to storage schemas.
Interface Definition
from typing import Protocol, Any
from framework_m_core.interfaces.base_doctype import BaseDocTypeProtocol
class SchemaMapperProtocol(Protocol):
"""Protocol for schema mapping."""
def create_table(
self,
model: type[BaseDocTypeProtocol],
metadata: Any
) -> Any:
"""Create storage schema for a single DocType."""
...
def create_tables(
self,
model: type[BaseDocTypeProtocol],
metadata: Any
) -> list[Any]:
"""Create schemas for DocType with child tables."""
...
def detect_schema_changes(
self,
old_model: type[BaseDocTypeProtocol],
new_model: type[BaseDocTypeProtocol]
) -> dict[str, Any]:
"""Detect schema differences between versions."""
...
Return Types by Implementation
SQLAlchemy (framework-m-standard)
from sqlalchemy import Table, MetaData
# create_table returns:
Table("users", metadata,
Column("id", UUID, primary_key=True),
Column("name", String(255)),
...
)
# detect_schema_changes returns:
{
"added_fields": ["email"],
"removed_fields": ["username"],
"modified_fields": [
{"field": "age", "old_type": "INTEGER", "new_type": "BIGINT"}
]
}
MongoDB (framework-mx-mongo)
# create_table returns:
{
"collection": "users",
"validator": {
"$jsonSchema": {
"bsonType": "object",
"required": ["id", "name"],
"properties": {
"id": {"bsonType": "string"},
"name": {"bsonType": "string"}
}
}
},
"indexes": [
{"keys": [("id", 1)], "unique": True},
{"keys": [("email", 1)], "unique": True}
]
}
Implementations
| Package | Class | Database |
|---|---|---|
framework-m-standard | SchemaMapper | SQLAlchemy Tables |
framework-mx-mongo | MongoSchemaMapper | MongoDB Collections |
Bootstrap Protocol
Location: framework_m_core.interfaces.bootstrap
Defines the interface for application startup steps.
Interface Definition
from typing import Protocol, Any
class BootstrapProtocol(Protocol):
"""Protocol for bootstrap steps."""
name: str # Unique identifier
order: int # Execution order (lower runs first)
async def run(self, container: Any) -> None:
"""Execute the bootstrap step."""
...
Standard Order Values
| Order | Purpose | Example |
|---|---|---|
| 10 | Initialize database/storage engine | init_engine, init_mongo |
| 20 | Initialize registries | init_registries |
| 30 | Sync schemas | sync_schema |
| 40 | Bind adapters to DI container | init_adapters |
| 50+ | Application-specific initialization | Custom steps |
Implementations
| Package | Class | Order | Purpose |
|---|---|---|---|
framework-m-standard | InitEngine | 10 | Create SQLAlchemy engine |
framework-m-standard | InitRegistries | 20 | Initialize MetaRegistry, FieldRegistry |
framework-m-standard | SyncSchema | 30 | Create database tables |
framework-m-standard | InitAdapters | 40 | Bind adapters to DI |
framework-mx-mongo | MongoInit | 10 | Initialize MongoDB client |
Usage Example
from framework_m_core.interfaces.bootstrap import BootstrapProtocol
class InitCache(BootstrapProtocol):
"""Initialize Redis cache."""
name = "init_cache"
order = 15 # After database, before registries
async def run(self, container: Any) -> None:
import redis.asyncio as redis
cache = await redis.from_url("redis://localhost")
container.bind("cache", cache)
print("✓ Redis cache initialized")
Register via entry point:
[project.entry-points."framework_m.bootstrap"]
init_cache = "myapp.bootstrap:InitCache"
Base DocType Protocol
Location: framework_m_core.interfaces.base_doctype
Defines the minimal interface that all DocTypes must implement.
Interface Definition
from typing import Protocol
from datetime import datetime
class BaseDocTypeProtocol(Protocol):
"""Minimum fields required for all DocTypes."""
id: str
name: str
created_at: datetime
modified_at: datetime
Standard Implementation
The framework-m-standard package provides a concrete implementation:
from pydantic import BaseModel, Field
from datetime import datetime
from uuid import uuid4
class BaseDocType(BaseModel):
"""Standard BaseDocType with additional fields."""
# Required by protocol
id: str = Field(default_factory=lambda: str(uuid4()))
name: str
created_at: datetime = Field(default_factory=datetime.now)
modified_at: datetime = Field(default_factory=datetime.now)
# Additional standard fields
owner: str | None = None
_version: int = Field(default=1, alias="version")
class Config:
from_attributes = True
Usage Example
from framework_m_core.interfaces.base_doctype import BaseDocTypeProtocol
from pydantic import BaseModel
class User(BaseModel):
"""User DocType - implements BaseDocTypeProtocol."""
id: str
name: str
created_at: datetime
modified_at: datetime
# Additional fields
email: str
role: str = "user"
# Satisfies protocol
def process_doctype(doc: BaseDocTypeProtocol) -> None:
print(f"Processing {doc.name} (ID: {doc.id})")
Permission Protocol
Location: framework_m_core.interfaces.permission
Defines the interface for permission checking and authorization.
Interface Definition
from typing import Protocol, Any
from framework_m_core.interfaces.permission import (
PermissionRequest,
PermissionResponse
)
class PermissionProtocol(Protocol):
"""Protocol for permission checking."""
async def check(self, request: PermissionRequest) -> PermissionResponse:
"""Check if action is permitted."""
...
Supporting Types
from enum import StrEnum
class PermissionAction(StrEnum):
"""Standard permission actions."""
READ = "read"
WRITE = "write"
CREATE = "create"
DELETE = "delete"
SUBMIT = "submit"
CANCEL = "cancel"
AMEND = "amend"
class PermissionRequest(BaseModel):
"""Request to check permission."""
user: str # User ID
doctype: str # DocType name
action: PermissionAction | str
doc: Any | None = None # Optional document instance
class PermissionResponse(BaseModel):
"""Response from permission check."""
allowed: bool
reason: str | None = None
Implementations
| Package | Class | Backend |
|---|---|---|
framework-m-standard | CitadelPolicyAdapter | Citadel RBAC service |
Usage Example
from framework_m_core.interfaces.permission import (
PermissionProtocol,
PermissionRequest,
PermissionAction
)
async def can_delete_user(
permission: PermissionProtocol,
user_id: str,
target_doc: User
) -> bool:
"""Check if user can delete another user."""
request = PermissionRequest(
user=user_id,
doctype="User",
action=PermissionAction.DELETE,
doc=target_doc
)
response = await permission.check(request)
return response.allowed
Event Bus Protocol
Location: framework_m_core.interfaces.event_bus
Defines the interface for pub/sub messaging.
Interface Definition
from typing import Protocol, Callable, Any
class Event(BaseModel):
"""Event message."""
type: str
data: dict[str, Any]
metadata: dict[str, Any] = {}
class EventBusProtocol(Protocol):
"""Protocol for event bus."""
async def publish(self, event: Event) -> None:
"""Publish an event."""
...
async def subscribe(
self,
event_type: str,
handler: Callable[[Event], None]
) -> None:
"""Subscribe to events of a type."""
...
async def unsubscribe(
self,
event_type: str,
handler: Callable[[Event], None]
) -> None:
"""Unsubscribe from events."""
...
Implementations
| Package | Class | Backend |
|---|---|---|
framework-m-standard | InMemoryEventBus | In-memory (dev/test) |
framework-m-standard | NatsEventBusAdapter | NATS JetStream (production) |
Usage Example
from framework_m_core.interfaces.event_bus import (
EventBusProtocol,
Event
)
async def notify_user_created(
event_bus: EventBusProtocol,
user: User
) -> None:
"""Publish user creation event."""
event = Event(
type="user.created",
data={"user_id": user.id, "email": user.email},
metadata={"timestamp": datetime.now().isoformat()}
)
await event_bus.publish(event)
async def setup_handlers(event_bus: EventBusProtocol) -> None:
"""Subscribe to events."""
async def on_user_created(event: Event):
print(f"New user: {event.data['email']}")
await event_bus.subscribe("user.created", on_user_created)
Cache Protocol
Location: framework_m_core.interfaces.cache
Defines the interface for caching operations.
Interface Definition
from typing import Protocol, Any
class CacheProtocol(Protocol):
"""Protocol for cache operations."""
async def get(self, key: str) -> Any | None:
"""Get value from cache."""
...
async def set(
self,
key: str,
value: Any,
ttl: int | None = None
) -> None:
"""Set value in cache with optional TTL (seconds)."""
...
async def delete(self, key: str) -> None:
"""Delete value from cache."""
...
async def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
...
async def clear(self) -> None:
"""Clear all cache entries."""
...
Implementations
| Package | Class | Backend |
|---|---|---|
framework-m-standard | RedisCache | Redis |
framework-m-standard | InMemoryCache | In-memory dict (dev/test) |
Usage Example
from framework_m_core.interfaces.cache import CacheProtocol
async def get_user_cached(
cache: CacheProtocol,
user_id: str,
repo: RepositoryProtocol[User]
) -> User | None:
"""Get user with caching."""
# Try cache first
cached = await cache.get(f"user:{user_id}")
if cached:
return User.model_validate_json(cached)
# Fetch from database
user = await repo.get(UUID(user_id))
if user:
# Cache for 5 minutes
await cache.set(
f"user:{user_id}",
user.model_dump_json(),
ttl=300
)
return user
Creating Custom Protocol Implementations
To create your own adapter, implement the protocol:
1. Choose a Protocol
from framework_m_core.interfaces.repository import RepositoryProtocol
2. Implement All Methods
class DynamoDBRepository(RepositoryProtocol[T]):
"""DynamoDB implementation of RepositoryProtocol."""
def __init__(self, model: type[T], table_name: str):
self._model = model
self._table = boto3.resource('dynamodb').Table(table_name)
async def get(self, id: UUID) -> T | None:
response = self._table.get_item(Key={'id': str(id)})
if 'Item' not in response:
return None
return self._model.model_validate(response['Item'])
# ... implement other methods
3. Register via Entry Point
[project.entry-points."framework_m.adapters.repository"]
default = "framework_mx_dynamodb.repository:DynamoDBRepository"
4. Verify Protocol Compliance
from framework_m_core.interfaces.repository import RepositoryProtocol
# Runtime check
assert isinstance(DynamoDBRepository(User, "users"), RepositoryProtocol)
Protocol Design Principles
All Framework-M protocols follow these principles:
- Database Agnostic - No storage-specific types in signatures
- Async First - All I/O operations are async
- Type Safe - Full type hints with generics
- Runtime Checkable - Use
@runtime_checkabledecorator - Minimal Surface - Only essential methods required
- Composable - Protocols can be combined
Example: Type Safety
# ✅ Good: Generic, database-agnostic
async def get_entity(repo: RepositoryProtocol[T], id: UUID) -> T | None:
return await repo.get(id)
# ❌ Bad: SQLAlchemy-specific
from sqlalchemy.orm import Session
def get_entity(session: Session, id: int) -> User:
return session.query(User).get(id)
See Also
Summary
Framework-M provides 7 core protocols:
| Protocol | Purpose | Entry Point Group |
|---|---|---|
RepositoryProtocol | Data persistence | framework_m.adapters.repository |
SchemaMapperProtocol | Schema mapping | framework_m.adapters.schema_mapper |
BootstrapProtocol | Startup steps | framework_m.bootstrap |
BaseDocTypeProtocol | Minimum DocType interface | N/A (structural) |
PermissionProtocol | Authorization | framework_m.adapters.permission |
EventBusProtocol | Pub/sub messaging | framework_m.adapters.event_bus |
CacheProtocol | Caching | framework_m.adapters.cache |
All protocols are:
- ✅ Database-agnostic
- ✅ Async-first
- ✅ Type-safe
- ✅ Runtime-checkable
- ✅ Extensible via entry points
Build your own MX package by implementing these protocols!