Skip to main content

Framework-M Protocol Reference

This document provides a comprehensive reference for all protocols (interfaces) in Framework-M Core. Protocols define the contracts that adapters must implement, enabling the MX pattern for database and service customization.

Table of Contents

  1. Repository Protocol
  2. Schema Mapper Protocol
  3. Bootstrap Protocol
  4. Base DocType Protocol
  5. Permission Protocol
  6. Event Bus Protocol
  7. Cache Protocol

Repository Protocol

Location: framework_m_core.interfaces.repository

The primary interface for data persistence operations. All repository adapters must implement this protocol.

Interface Definition

from typing import Protocol, TypeVar, Sequence
from uuid import UUID
from framework_m_core.interfaces.repository import (
FilterSpec,
OrderSpec,
PaginatedResult
)

T = TypeVar("T")

class RepositoryProtocol[T](Protocol):
"""Protocol for data repositories."""

async def get(self, id: UUID) -> T | None:
"""Retrieve entity by ID."""
...

async def save(self, entity: T, version: int | None = None) -> T:
"""Save entity (insert or update)."""
...

async def delete(self, id: UUID) -> None:
"""Delete entity by ID."""
...

async def exists(self, id: UUID) -> bool:
"""Check if entity exists."""
...

async def count(self, filters: list[FilterSpec] | None = None) -> int:
"""Count entities matching filters."""
...

async def list(
self,
filters: list[FilterSpec] | None = None,
order_by: list[OrderSpec] | None = None,
limit: int = 20,
offset: int = 0,
) -> PaginatedResult[T]:
"""List entities with pagination."""
...

async def bulk_save(self, entities: Sequence[T]) -> Sequence[T]:
"""Save multiple entities."""
...

Supporting Types

FilterSpec

from framework_m_core.interfaces.repository import FilterOperator

class FilterSpec(BaseModel):
"""Specification for filtering queries."""
field: str
operator: FilterOperator # EQ, NE, LT, LTE, GT, GTE, IN, NOT_IN, LIKE, IS_NULL
value: Any

Example:

filters = [
FilterSpec(field="status", operator=FilterOperator.EQ, value="active"),
FilterSpec(field="age", operator=FilterOperator.GTE, value=18)
]

OrderSpec

from framework_m_core.interfaces.repository import OrderDirection

class OrderSpec(BaseModel):
"""Specification for ordering results."""
field: str
direction: OrderDirection = OrderDirection.ASC # ASC or DESC

Example:

order_by = [
OrderSpec(field="created_at", direction=OrderDirection.DESC)
]

PaginatedResult

class PaginatedResult[T](BaseModel):
"""Container for paginated results."""
items: Sequence[T]
total: int
limit: int
offset: int

@property
def has_more(self) -> bool:
"""Check if more items exist."""
return self.offset + len(self.items) < self.total

Implementations

PackageClassDatabase
framework-m-standardGenericRepositoryPostgreSQL/MySQL (SQLAlchemy)
framework-mx-mongoMongoGenericRepositoryMongoDB (Motor)

Usage Example

from framework_m_core.interfaces.repository import (
RepositoryProtocol,
FilterSpec,
FilterOperator
)

async def get_active_users(repo: RepositoryProtocol[User]) -> list[User]:
"""Get all active users - works with any repository implementation."""
filters = [
FilterSpec(field="status", operator=FilterOperator.EQ, value="active")
]
result = await repo.list(filters=filters, limit=100)
return list(result.items)

Schema Mapper Protocol

Location: framework_m_core.interfaces.schema_mapper

Defines the interface for mapping Pydantic models to storage schemas.

Interface Definition

from typing import Protocol, Any
from framework_m_core.interfaces.base_doctype import BaseDocTypeProtocol

class SchemaMapperProtocol(Protocol):
"""Protocol for schema mapping."""

def create_table(
self,
model: type[BaseDocTypeProtocol],
metadata: Any
) -> Any:
"""Create storage schema for a single DocType."""
...

def create_tables(
self,
model: type[BaseDocTypeProtocol],
metadata: Any
) -> list[Any]:
"""Create schemas for DocType with child tables."""
...

def detect_schema_changes(
self,
old_model: type[BaseDocTypeProtocol],
new_model: type[BaseDocTypeProtocol]
) -> dict[str, Any]:
"""Detect schema differences between versions."""
...

Return Types by Implementation

SQLAlchemy (framework-m-standard)

from sqlalchemy import Table, MetaData

# create_table returns:
Table("users", metadata,
Column("id", UUID, primary_key=True),
Column("name", String(255)),
...
)

# detect_schema_changes returns:
{
"added_fields": ["email"],
"removed_fields": ["username"],
"modified_fields": [
{"field": "age", "old_type": "INTEGER", "new_type": "BIGINT"}
]
}

MongoDB (framework-mx-mongo)

# create_table returns:
{
"collection": "users",
"validator": {
"$jsonSchema": {
"bsonType": "object",
"required": ["id", "name"],
"properties": {
"id": {"bsonType": "string"},
"name": {"bsonType": "string"}
}
}
},
"indexes": [
{"keys": [("id", 1)], "unique": True},
{"keys": [("email", 1)], "unique": True}
]
}

Implementations

PackageClassDatabase
framework-m-standardSchemaMapperSQLAlchemy Tables
framework-mx-mongoMongoSchemaMapperMongoDB Collections

Bootstrap Protocol

Location: framework_m_core.interfaces.bootstrap

Defines the interface for application startup steps.

Interface Definition

from typing import Protocol, Any

class BootstrapProtocol(Protocol):
"""Protocol for bootstrap steps."""

name: str # Unique identifier
order: int # Execution order (lower runs first)

async def run(self, container: Any) -> None:
"""Execute the bootstrap step."""
...

Standard Order Values

OrderPurposeExample
10Initialize database/storage engineinit_engine, init_mongo
20Initialize registriesinit_registries
30Sync schemassync_schema
40Bind adapters to DI containerinit_adapters
50+Application-specific initializationCustom steps

Implementations

PackageClassOrderPurpose
framework-m-standardInitEngine10Create SQLAlchemy engine
framework-m-standardInitRegistries20Initialize MetaRegistry, FieldRegistry
framework-m-standardSyncSchema30Create database tables
framework-m-standardInitAdapters40Bind adapters to DI
framework-mx-mongoMongoInit10Initialize MongoDB client

Usage Example

from framework_m_core.interfaces.bootstrap import BootstrapProtocol

class InitCache(BootstrapProtocol):
"""Initialize Redis cache."""

name = "init_cache"
order = 15 # After database, before registries

async def run(self, container: Any) -> None:
import redis.asyncio as redis

cache = await redis.from_url("redis://localhost")
container.bind("cache", cache)
print("✓ Redis cache initialized")

Register via entry point:

[project.entry-points."framework_m.bootstrap"]
init_cache = "myapp.bootstrap:InitCache"

Base DocType Protocol

Location: framework_m_core.interfaces.base_doctype

Defines the minimal interface that all DocTypes must implement.

Interface Definition

from typing import Protocol
from datetime import datetime

class BaseDocTypeProtocol(Protocol):
"""Minimum fields required for all DocTypes."""

id: str
name: str
created_at: datetime
modified_at: datetime

Standard Implementation

The framework-m-standard package provides a concrete implementation:

from pydantic import BaseModel, Field
from datetime import datetime
from uuid import uuid4

class BaseDocType(BaseModel):
"""Standard BaseDocType with additional fields."""

# Required by protocol
id: str = Field(default_factory=lambda: str(uuid4()))
name: str
created_at: datetime = Field(default_factory=datetime.now)
modified_at: datetime = Field(default_factory=datetime.now)

# Additional standard fields
owner: str | None = None
_version: int = Field(default=1, alias="version")

class Config:
from_attributes = True

Usage Example

from framework_m_core.interfaces.base_doctype import BaseDocTypeProtocol
from pydantic import BaseModel

class User(BaseModel):
"""User DocType - implements BaseDocTypeProtocol."""

id: str
name: str
created_at: datetime
modified_at: datetime

# Additional fields
email: str
role: str = "user"

# Satisfies protocol
def process_doctype(doc: BaseDocTypeProtocol) -> None:
print(f"Processing {doc.name} (ID: {doc.id})")

Permission Protocol

Location: framework_m_core.interfaces.permission

Defines the interface for permission checking and authorization.

Interface Definition

from typing import Protocol, Any
from framework_m_core.interfaces.permission import (
PermissionRequest,
PermissionResponse
)

class PermissionProtocol(Protocol):
"""Protocol for permission checking."""

async def check(self, request: PermissionRequest) -> PermissionResponse:
"""Check if action is permitted."""
...

Supporting Types

from enum import StrEnum

class PermissionAction(StrEnum):
"""Standard permission actions."""
READ = "read"
WRITE = "write"
CREATE = "create"
DELETE = "delete"
SUBMIT = "submit"
CANCEL = "cancel"
AMEND = "amend"

class PermissionRequest(BaseModel):
"""Request to check permission."""
user: str # User ID
doctype: str # DocType name
action: PermissionAction | str
doc: Any | None = None # Optional document instance

class PermissionResponse(BaseModel):
"""Response from permission check."""
allowed: bool
reason: str | None = None

Implementations

PackageClassBackend
framework-m-standardCitadelPolicyAdapterCitadel RBAC service

Usage Example

from framework_m_core.interfaces.permission import (
PermissionProtocol,
PermissionRequest,
PermissionAction
)

async def can_delete_user(
permission: PermissionProtocol,
user_id: str,
target_doc: User
) -> bool:
"""Check if user can delete another user."""

request = PermissionRequest(
user=user_id,
doctype="User",
action=PermissionAction.DELETE,
doc=target_doc
)

response = await permission.check(request)
return response.allowed

Event Bus Protocol

Location: framework_m_core.interfaces.event_bus

Defines the interface for pub/sub messaging.

Interface Definition

from typing import Protocol, Callable, Any

class Event(BaseModel):
"""Event message."""
type: str
data: dict[str, Any]
metadata: dict[str, Any] = {}

class EventBusProtocol(Protocol):
"""Protocol for event bus."""

async def publish(self, event: Event) -> None:
"""Publish an event."""
...

async def subscribe(
self,
event_type: str,
handler: Callable[[Event], None]
) -> None:
"""Subscribe to events of a type."""
...

async def unsubscribe(
self,
event_type: str,
handler: Callable[[Event], None]
) -> None:
"""Unsubscribe from events."""
...

Implementations

PackageClassBackend
framework-m-standardInMemoryEventBusIn-memory (dev/test)
framework-m-standardNatsEventBusAdapterNATS JetStream (production)

Usage Example

from framework_m_core.interfaces.event_bus import (
EventBusProtocol,
Event
)

async def notify_user_created(
event_bus: EventBusProtocol,
user: User
) -> None:
"""Publish user creation event."""

event = Event(
type="user.created",
data={"user_id": user.id, "email": user.email},
metadata={"timestamp": datetime.now().isoformat()}
)

await event_bus.publish(event)

async def setup_handlers(event_bus: EventBusProtocol) -> None:
"""Subscribe to events."""

async def on_user_created(event: Event):
print(f"New user: {event.data['email']}")

await event_bus.subscribe("user.created", on_user_created)

Cache Protocol

Location: framework_m_core.interfaces.cache

Defines the interface for caching operations.

Interface Definition

from typing import Protocol, Any

class CacheProtocol(Protocol):
"""Protocol for cache operations."""

async def get(self, key: str) -> Any | None:
"""Get value from cache."""
...

async def set(
self,
key: str,
value: Any,
ttl: int | None = None
) -> None:
"""Set value in cache with optional TTL (seconds)."""
...

async def delete(self, key: str) -> None:
"""Delete value from cache."""
...

async def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
...

async def clear(self) -> None:
"""Clear all cache entries."""
...

Implementations

PackageClassBackend
framework-m-standardRedisCacheRedis
framework-m-standardInMemoryCacheIn-memory dict (dev/test)

Usage Example

from framework_m_core.interfaces.cache import CacheProtocol

async def get_user_cached(
cache: CacheProtocol,
user_id: str,
repo: RepositoryProtocol[User]
) -> User | None:
"""Get user with caching."""

# Try cache first
cached = await cache.get(f"user:{user_id}")
if cached:
return User.model_validate_json(cached)

# Fetch from database
user = await repo.get(UUID(user_id))
if user:
# Cache for 5 minutes
await cache.set(
f"user:{user_id}",
user.model_dump_json(),
ttl=300
)

return user

Creating Custom Protocol Implementations

To create your own adapter, implement the protocol:

1. Choose a Protocol

from framework_m_core.interfaces.repository import RepositoryProtocol

2. Implement All Methods

class DynamoDBRepository(RepositoryProtocol[T]):
"""DynamoDB implementation of RepositoryProtocol."""

def __init__(self, model: type[T], table_name: str):
self._model = model
self._table = boto3.resource('dynamodb').Table(table_name)

async def get(self, id: UUID) -> T | None:
response = self._table.get_item(Key={'id': str(id)})
if 'Item' not in response:
return None
return self._model.model_validate(response['Item'])

# ... implement other methods

3. Register via Entry Point

[project.entry-points."framework_m.adapters.repository"]
default = "framework_mx_dynamodb.repository:DynamoDBRepository"

4. Verify Protocol Compliance

from framework_m_core.interfaces.repository import RepositoryProtocol

# Runtime check
assert isinstance(DynamoDBRepository(User, "users"), RepositoryProtocol)

Protocol Design Principles

All Framework-M protocols follow these principles:

  1. Database Agnostic - No storage-specific types in signatures
  2. Async First - All I/O operations are async
  3. Type Safe - Full type hints with generics
  4. Runtime Checkable - Use @runtime_checkable decorator
  5. Minimal Surface - Only essential methods required
  6. Composable - Protocols can be combined

Example: Type Safety

# ✅ Good: Generic, database-agnostic
async def get_entity(repo: RepositoryProtocol[T], id: UUID) -> T | None:
return await repo.get(id)

# ❌ Bad: SQLAlchemy-specific
from sqlalchemy.orm import Session
def get_entity(session: Session, id: int) -> User:
return session.query(User).get(id)

See Also


Summary

Framework-M provides 7 core protocols:

ProtocolPurposeEntry Point Group
RepositoryProtocolData persistenceframework_m.adapters.repository
SchemaMapperProtocolSchema mappingframework_m.adapters.schema_mapper
BootstrapProtocolStartup stepsframework_m.bootstrap
BaseDocTypeProtocolMinimum DocType interfaceN/A (structural)
PermissionProtocolAuthorizationframework_m.adapters.permission
EventBusProtocolPub/sub messagingframework_m.adapters.event_bus
CacheProtocolCachingframework_m.adapters.cache

All protocols are:

  • ✅ Database-agnostic
  • ✅ Async-first
  • ✅ Type-safe
  • ✅ Runtime-checkable
  • ✅ Extensible via entry points

Build your own MX package by implementing these protocols!