Framework M Fundamentals
Core concepts for understanding and extending Framework M.
0. Philosophy: Minimal Core, Maximum Flexibility
Only async Python and DI wiring are irreplaceable. Everything else can be replaced, extended, or composed.
The Irreplaceable Core
| Layer | What | Replaceable? |
|---|---|---|
| Language | Python 3.12+ async | ❌ No |
| DI Container | Container + wiring | ❌ No |
| Protocols | Interface definitions | ❌ No (contracts) |
| Entrypoints | Plugin discovery | ❌ No (mechanism) |
Blessed Defaults (Deeply Integrated)
These are tightly coupled to the framework but have escape hatches:
| Component | Default | Escape Hatch |
|---|---|---|
| HTTP Framework | Litestar | Routes, middleware are Litestar-native (no abstraction) |
| Database Engine | SQLAlchemy | Virtual DocTypes with custom RepositoryProtocol implementations |
While SQLAlchemy manages connection pooling, transaction boundaries (via UnitOfWork), and metadata schemas, developers do not write or interact with SQLAlchemy ORM Model classes. Instead, the GenericRepository acts as the mapping layer to hydrate data directly to and from Pydantic-based BaseDocType classes.
Virtual DocTypes: For non-SQL data sources (APIs, NoSQL, files), implement RepositoryProtocol and register via RepositoryFactory.register_override():
# Custom repository for any data source
class MongoRepository(RepositoryProtocol[MyDoc]):
async def get(self, id: UUID) -> MyDoc | None: ...
async def save(self, entity: MyDoc) -> MyDoc: ...
# Register the override
RepositoryFactory().register_override("MyDoc", MongoRepository)
Everything Else is Replaceable
Components with Protocols can be swapped by implementing the interface and registering via entrypoints:
| Component | Default | Can Swap To | Protocol |
|---|---|---|---|
| Database | PostgreSQL | MySQL, SQLite, Oracle | (SQLAlchemy drivers) |
| Event Bus | NATS | Redis, Kafka, RabbitMQ | EventBusProtocol |
| Job Queue | Taskiq | Celery, ARQ | JobQueueProtocol |
| Cache | Redis | Memcached, Memory | CacheProtocol |
| Storage | S3 | Local, Memory | StorageProtocol |
| Auth | Local User | Keycloak, Auth0, custom | IdentityProtocol |
| Frontend | Refine.dev | Next.js, Vue, Flutter | (API-driven) |
Open-Closed Principle
秉
秉
Closed for modification: Protocols are stable contracts. Open for extension: Add new adapters via entrypoints.
Zero-Cliff Future
This design ensures:
- No vendor lock-in — swap any component
- No rewrite cliffs — extend, don't replace the whole framework
- Progressive enhancement — start simple, add capabilities as needed
- Test isolation — swap real adapters for mocks
1. Dependency Injection as Universal Composer
DI is not just for HTTP requests — it's the universal composition mechanism across:
| Context | How DI Works |
|---|---|
| Litestar (HTTP) | Request-scoped via lifespan |
| CLI commands | App-scoped via entrypoints |
| Background jobs | Job-scoped via Taskiq |
| Tests | Overrides via fixtures |
| Future (REPL, Notebooks) | Same container pattern |
1.1 The Container Pattern
# container.py - Central composition root
from framework_m_core.container import Container
def create_container(settings: Settings) -> Container:
"""Build the DI container with all dependencies."""
container = Container()
# Register adapters (implementations)
container.register(DatabaseAdapter, SQLAlchemyAdapter(settings.DATABASE_URL))
container.register(EventBus, NATSEventBus(settings.NATS_URL))
container.register(JobQueue, TaskiqQueue(settings.NATS_URL))
container.register(Cache, RedisCache(settings.REDIS_URL))
# Register repositories
container.register(ItemRepository, GenericRepository(Item, container.get(DatabaseAdapter)))
# Register services
container.register(ItemService, ItemService(container.get(ItemRepository)))
return container
1.2 Entrypoint Registration
Framework M discovers adapters via Python entrypoints (not hardcoded imports):
# pyproject.toml
[project.entry-points."framework_m.adapters.eventbus"]
nats = "framework_m.adapters.eventbus.nats:NATSEventBus"
redis = "framework_m.adapters.eventbus.redis:RedisEventBus"
[project.entry-points."framework_m.adapters.cache"]
redis = "framework_m.adapters.cache.redis:RedisCache"
memory = "framework_m.adapters.cache.memory:MemoryCache"
[project.entry-points."framework_m.adapters.storage"]
s3 = "framework_m.adapters.storage.s3:S3Storage"
local = "framework_m.adapters.storage.local:LocalStorage"
1.3 Configuration-Driven Selection
# settings.py
class Settings(BaseSettings):
# Select adapter by name (from entrypoints)
EVENT_BUS_ADAPTER: str = "nats"
CACHE_ADAPTER: str = "redis"
STORAGE_ADAPTER: str = "s3"
# bootstrap.py
def load_adapter(group: str, name: str):
"""Load adapter class from entrypoints."""
from importlib.metadata import entry_points
eps = entry_points(group=group)
return eps[name].load()
# Usage
EventBusClass = load_adapter("framework_m.adapters.eventbus", settings.EVENT_BUS_ADAPTER)
container.register(EventBus, EventBusClass(settings))
2. Bootstrap Flow
2.1 Litestar (HTTP Server)
# app.py
from litestar import Litestar
from litestar.di import Provide
def create_app() -> Litestar:
settings = Settings()
container = create_container(settings)
return Litestar(
route_handlers=[...],
dependencies={
"settings": Provide(lambda: settings),
"container": Provide(lambda: container),
"item_service": Provide(lambda: container.get(ItemService)),
},
lifespan=[container_lifespan],
)
async def container_lifespan(app: Litestar):
"""Startup/shutdown lifecycle."""
container = app.state.container
await container.startup() # Connect to DBs, queues
yield
await container.shutdown() # Cleanup
2.2 CLI Commands
# cli/commands.py
import cyclopts
from container import create_container
app = cyclopts.App()
@app.command
def import_items(file: str):
"""CLI uses same DI container."""
settings = Settings()
container = create_container(settings)
# Get service from container
item_service = container.get(ItemService)
# Use it
items = parse_csv(file)
for item in items:
item_service.create(item)
2.3 Background Jobs (Taskiq)
# jobs/tasks.py
from framework_m_core.di import inject, Provide
from framework_m_core.container import Container
@broker.task
@inject
async def process_order(
order_id: str,
order_service: OrderService = Provide[Container.order_service],
):
"""Job gets dependencies from same container."""
await order_service.process(order_id)
3. Repository + Controller + Service Composition
3.1 Layer Responsibilities
秉
秉
3.2 Composition Example
# 1. DocType (schema only)
class Invoice(BaseDocType):
customer: str
total: Decimal
items: list[InvoiceItem]
# 2. Controller (business logic + hooks)
class InvoiceController(BaseController[Invoice]):
async def validate(self, context=None):
if self.doc.total < 0:
raise ValueError("Total cannot be negative")
async def after_save(self, context=None):
await self.update_customer_balance()
# 3. Repository (CRUD + hooks invocation)
invoice_repo = GenericRepository(
model=Invoice,
table=invoice_table,
controller_class=InvoiceController, # Wired here
)
# 4. Service (orchestration + transactions)
class InvoiceService:
def __init__(self, repo: GenericRepository[Invoice], uow_factory: UoWFactory):
self.repo = repo
self.uow_factory = uow_factory
async def create_invoice(self, data: InvoiceCreate) -> Invoice:
async with self.uow_factory() as uow:
invoice = Invoice(**data.model_dump())
await self.repo.save(uow.session, invoice) # Calls controller hooks
await uow.commit()
return invoice
4. Adding Custom Adapters
4.1 Implement the Protocol
# my_app/adapters/custom_cache.py
from framework_m_core.interfaces import CacheProtocol
class MemcachedCache(CacheProtocol):
def __init__(self, url: str):
self.client = pymemcache.Client(url)
async def get(self, key: str) -> Any:
return self.client.get(key)
async def set(self, key: str, value: Any, ttl: int = 300):
self.client.set(key, value, expire=ttl)
4.2 Register via Entrypoint
# my_app/pyproject.toml
[project.entry-points."framework_m.adapters.cache"]
memcached = "my_app.adapters.custom_cache:MemcachedCache"
4.3 Select in Config
# .env
CACHE_ADAPTER=memcached
MEMCACHED_URL=memcached://localhost:11211
5. Testing with DI
5.1 Override Dependencies
# tests/conftest.py
import pytest
from framework_m_core.container import Container
@pytest.fixture
def test_container():
"""Container with test adapters."""
container = Container()
container.register(Cache, MemoryCache()) # In-memory for tests
container.register(EventBus, MockEventBus()) # Mock for tests
return container
@pytest.fixture
def item_service(test_container):
return test_container.get(ItemService)
5.2 Mock Specific Dependencies
@pytest.mark.asyncio
async def test_create_invoice(item_service, mocker):
# Mock specific method
mocker.patch.object(item_service.repo, "save", return_value=mock_invoice)
result = await item_service.create(data)
assert result.id == mock_invoice.id
Summary
| Concept | Key Point |
|---|---|
| DI Container | Universal composer for all contexts |
| Entrypoints | Pluggable adapters without code changes |
| Bootstrap | Same container for HTTP, CLI, Jobs |
| Composition | Service → Repository → Controller → DocType |
| Testing | Override adapters via container |