Phase 02: DocType Engine & Database
Objective: Implement the metadata engine that dynamically creates database tables from Pydantic models and provides generic CRUD operations.
[!IMPORTANT] Database Agnostic: All database implementations MUST work with both SQLite and PostgreSQL. Avoid database-specific features (e.g., PostgreSQL's
ARRAY). Use portable SQL types that work across databases.
1. Schema Mapper (Pydantic → SQLAlchemy)
1.1 Field Registry & Type Mapping
- Create
src/framework_m/adapters/db/field_registry.py - Implement
FieldRegistryclass (Singleton):- Start with standard types: str, int, float, bool, datetime, date, decimal, uuid, json
- Allow registration of custom types:
register_type(pydantic_type, sqlalchemy_type)
- Define type mapping using Registry:
-
str→String -
int→Integer -
float→Float -
bool→Boolean -
datetime→DateTime -
date→Date -
Decimal→Numeric -
UUID→UUID -
list[str]→JSON(portable, works on all databases) -
dict→JSON
-
1.2 Schema Mapper Implementation
TDD Focus: Write tests/adapters/db/test_schema_mapper.py first!
[!IMPORTANT] Primary Key Design Decision: Use
id(UUID) as primary key,nameas unique index.
- Rationale: Allows document renames without cascading foreign key updates
- Frappe Issue: Using
nameas PK causes expensive updates on rename- Standard Practice: All major databases use auto-generated
idas PK- Flexibility: Framework users can rename documents cheaply
-
Create
tests/adapters/db/test_schema_mapper.py- Test mapping simple fields (str, int)
- Test mapping relationships
-
Create
SchemaMapperclass -
Use
FieldRegistryto look up SQLAlchemy types -
Implement
create_table(model: Type[BaseDocType]) -> Table:- Extract table name from model class name (lowercase)
- Iterate over
model.model_fields - Map each field to SQLAlchemy column
- Handle
Optionaltypes (nullable=True) - Set
idfield as primary key (UUID, auto-generated) - Set
namefield as unique index (for human-readable lookup) - OCC: Add
_version(Integer, default=0) ifMeta.concurrency="optimistic" - Return SQLAlchemy
Tableobject
-
Handle special field types:
- Relationships (ForeignKey) - detect by field name pattern
*_id - Enums - map to SQLAlchemy String type (database agnostic)
- Nested Pydantic models (
list[BaseModel]) - store as JSON - Child DocTypes (
list[DocType]) - map to Relational Table with Foreign Key
- Relationships (ForeignKey) - detect by field name pattern
1.3 Table Registry
- Create
TableRegistryclass to store created tables - Add methods:
-
register_table(doctype_name: str, table: Table) -
get_table(doctype_name: str) -> Table -
table_exists(doctype_name: str) -> bool
-
2. Database Connection Setup
2.1 Connection Factory & Virtual DocTypes
- Create
src/framework_m/adapters/db/connection.py - Implement
ConnectionFactory(supports Multiple Bindings):- Load
db_bindsfrom config (e.g.,legacy,timescale) - Create generic async engine (supports Postgres, SQLite for testing)
- Maintain map of
engine_name -> AsyncEngine
- Load
2.2 Session Factory (SQL Support)
- Add session factory:
-
async def get_session(bind: str = "default") -> AsyncSession - Support
VirtualDocType(SQL Bind) by acceptingbind_keyfrom DocType Meta
-
2.3 Non-SQL Virtual DocTypes (Custom Repositories)
- Create concept of
RepositoryOverride:- Allow DocType to define
repository_classin Meta -
RepositoryFactoryinstantiates this instead ofGenericRepository
- Allow DocType to define
- Create
tests/adapters/db/test_repository_factory.py:- Implement a Mock
FileRepositorythat reads from JSON file - Register it for a
VirtualDoc - Verify
VirtualDoc.get()callsFileRepository.get()
- Implement a Mock
3. Generic Repository Implementation
- Create
tests/adapters/db/test_generic_repository.py - Define tests for CRUD operations (mock
AsyncSession) - Create
src/framework_m/adapters/db/generic_repository.py - Implement
GenericRepository[T]class - Constructor dependencies (session is NOT stored here):
-
model: Type[T] -
table: Table -
controller_class: Type[BaseController] | None -
event_bus: EventBusProtocol | None(InMemoryEventBus for dev)
-
[!NOTE] All CRUD methods accept
session: AsyncSessionas the FIRST argument. The caller (Service/Controller) owns the session viaUnitOfWork.
3.1 CRUD Operations
-
Implement
async def get(session: AsyncSession, id: UUID) -> Optional[T]:- Build SELECT query
- Filter
deleted_at IS NULL(unless specific flag overrides) - Execute with provided session (not stored session)
- Convert row to Pydantic model
- Return None if not found
-
Implement
async def get_by_name(session: AsyncSession, name: str) -> Optional[T]:- Helper for looking up by human-readable name
-
Implement
async def save(session: AsyncSession, entity: T, version: int | None = None) -> T:- Check if entity exists (by
id) - If new:
- Generate
id(UUIDv7) - Note:
created_by/ownermust be set by Controller/Service before calling save - Call controller
validate() - Call controller
before_create() - Call controller
before_save() - Execute INSERT with provided session
- Call controller
after_save() - Call controller
after_create() - Emit Event:
event_bus.publish(f"{doctype}.create", payload)
- Generate
- If existing:
- Note:
modified_bymust be set by Controller/Service before calling save - Call controller
validate() - Call controller
before_save() - OCC: If
optimistic:-
UPDATE table SET ..., _version=_version+1 WHERE id=:id AND _version=:old_ver - If rowcount == 0, raise
VersionConflictError
-
- Else: Execute standard UPDATE
- Call controller
after_save()
- Note:
- Emit Event:
event_bus.publish(f"{doctype}.update", payload) - Return saved entity (caller calls
uow.commit()when ready)
- Check if entity exists (by
-
Implement
async def delete(session: AsyncSession, id: UUID, hard: bool = False) -> None:- Load entity
- Call controller
before_delete() - If
hard:- Execute DELETE
- Else (Soft Delete):
- Update
deleted_at = now()
- Update
- Call controller
after_delete() - Emit Event:
event_bus.publish(f"{doctype}.delete", payload) - Return (caller calls
uow.commit()when ready)
-
Implement
async def list(session: AsyncSession, filters, limit, offset, order_by) -> Sequence[T]:- Build SELECT query with filters
- Default filter:
deleted_at IS NULL - Apply pagination (limit, offset)
- Apply sorting (order_by)
- Execute query
- Convert rows to Pydantic models
- Return list
3.2 Lifecycle Hook Integration
- Create helper method
_call_hook(hook_name: str):- Check if controller exists
- Check if hook method exists on controller
- Call hook method if present
- Handle exceptions and rollback on error
4. Migration System
4.1 Alembic Integration
-
Initialize Alembic in project:
alembic init alembicImplemented via
MigrationManager.init()which creates alembic directory structure -
Configure
alembic.ini:- Set database URL from environment
- Configure migration file location
-
Create
alembic/env.py:- Import
MetaRegistry - Import all registered DocTypes
- Set
target_metadatafrom SchemaMapper tables Auto-generated with async support viaMigrationManager.init()
- Import
4.2 Auto-Migration Detection
-
Create
src/framework_m/adapters/db/migration.py -
Implement
detect_schema_changes():- Compare registered DocTypes with database schema
- Detect new tables
- Detect new columns
- Detect type changes
- Return list of changes
-
Implement
auto_migrate():- Call
detect_schema_changes() - Generate Alembic migration if changes detected
- Apply migration automatically (dev mode only)
- Call
4.3 CLI Commands
- Add migration commands to CLI:
-
m migrate- run pending migrations -
m migrate:create <name>- create new migration -
m migrate:rollback- rollback last migration -
m migrate:status- show migration status Also added:m migrate:history,m migrate:init
-
5. Repository Factory
- Create
src/framework_m/adapters/db/repository_factory.py - Implement
RepositoryFactoryclass:-
create_generic_repository(doctype_name: str) -> GenericRepository - Look up DocType from MetaRegistry
- Look up Table from TableRegistry
- Look up Controller from MetaRegistry
- Create and return GenericRepository instance
- Support
event_busparameter for domain events - Support custom repository overrides for Virtual DocTypes
-
6. Engine, Session Factory & Unit of Work
[!IMPORTANT] Anti-Pattern to Avoid: Do NOT tie transaction lifecycle to HTTP request lifecycle. Repositories receive sessions; they do NOT create or own them.
6.1 Engine & Session Factory Setup
- Create
src/framework_m/adapters/db/connection.py(ConnectionFactory) - Implement
create_engine(url: str) -> AsyncEngine- Pool configuration (pool_size, max_overflow, timeout, recycle, pre_ping)
- Environment variable expansion (
${VAR}syntax)
- Implement
SessionFactory(returnsAsyncSessioncontext managers)- Support multiple binds (for Virtual DocTypes with SQL binds)
- Configuration from ConnectionFactory
- Auto commit/rollback in context manager
6.2 Unit of Work (UnitOfWork)
- Create
tests/core/test_unit_of_work.py- Test: UoW provides session
- Test:
commit()persists changes - Test: Exception causes rollback (no explicit rollback call needed)
- Test: Session is closed after
__aexit__
- Create
src/framework_m/core/unit_of_work.py - Implement
UnitOfWorkcontext manager:-
__init__(session_factory: Callable[[], AsyncSession]) -
sessionproperty with safety check -
async __aenter__()creates session -
async commit()calls session.commit() -
async rollback()for explicit rollback -
async __aexit__()with auto-rollback on exception
-
- Register
UnitOfWorkFactoryin DI container
6.3 Multi-Source Coordination (Outbox Pattern)
- Create
src/framework_m/core/domain/outbox.py - Define
OutboxEntrymodel:-
id: UUID -
target: str(e.g., "mongodb.audit_log", "api.payment_gateway") -
payload: dict -
status: str(pending, processed, failed) -
created_at: datetime -
processed_at: datetime | None -
error_message: str | None -
retry_count: int
-
- Create
OutboxRepository(SQL-backed) inadapters/db/outbox_repository.py-
add(session, entry)- add entry in same transaction -
get_pending(session, limit)- get pending entries -
mark_processed(session, id)- mark as processed -
mark_failed(session, id, error)- mark as failed
-
- Document: Services write to Outbox in the same SQL transaction
- (Phase 04) Background worker processes Outbox entries
6.4 Startup Sequence (Schema Sync)
- Create
src/framework_m/adapters/db/__init__.py - Implement startup sequence (called once at app boot, NOT per-request):
- Initialize database engine (NOT session)
- Discover all DocTypes via MetaRegistry
- Create/sync tables via SchemaMapper
- Register tables in TableRegistry
- Run auto-migration (if enabled)
7. Testing
7.1 Unit Tests
-
Test
SchemaMapper:- Test type mapping for all supported types (
test_schema_mapper.py) - Test primary key creation (
test_table_has_id_column_as_primary_key) - Test nullable fields (
TestSchemaMapperNullableFields) - Test enum mapping (
TestSchemaMapperEnums)
- Test type mapping for all supported types (
-
Test
GenericRepository:- Test CRUD operations with mock data (
test_generic_repository.py) - Test lifecycle hook calls (
TestControllerHooks) - Test transaction rollback on error (
TestTransactionRollback)
- Test CRUD operations with mock data (
7.2 Integration Tests
-
Setup testcontainers for PostgreSQL (skips if Docker unavailable)
-
Create test DocType (
IntegrationTestDoc/PostgresTestDoc) -
Test full flow (SQLite & Postgres - Postgres skipped without Docker):
- Register DocType (
test_register_doctype) - Create table (
test_table_created) - Insert document (
test_insert_document) - Query document (
test_query_document) - Update document (
test_update_document) - Delete document (
test_delete_document)
- Register DocType (
-
Test migration:
- Add field to DocType
- Run auto-migration
- Verify column added to table
8. Error Handling
-
Create custom exceptions (
core/exceptions.py):-
DocTypeNotFoundError -
ValidationError -
PermissionDeniedError -
DuplicateNameError -
RepositoryError,EntityNotFoundError,DatabaseError,IntegrityError
-
-
Add error handling in repository (
generic_repository.py):- Catch SQLAlchemy errors (
IntegrityError,OperationalError,SQLAlchemyError) - Convert to domain exceptions (
DuplicateNameError,DatabaseError, etc.) - Log errors with context (using
logger.errorwithextradict)
- Catch SQLAlchemy errors (
9. Performance Optimizations
-
Add query result caching (phase-04):
- Cache
get()results by ID - Invalidate cache on save/delete
- Use Redis for distributed cache
- Cache
-
Add bulk operations (
generic_repository.py):-
async def bulk_save(entities)- separates new/existing -
async def _bulk_insert(entities)- true SQLAlchemy bulk insert - Uses
insert().values([...])for performance
-
-
Add query optimization (add indexes after the API layer exists):
- Add indexes for common queries (schema_mapper: owner, creation, Meta.indexes)
- Add parent index for child tables (efficient joins)
- Use
select_in_loadingfor relationships (load_children_for_parentsmethod) - Add query logging in dev mode (
logger.debugfor GET/LIST)
Validation Checklist
Before moving to Phase 03, verify:
- Can create tables from Pydantic models dynamically
- CRUD operations work with lifecycle hooks
- Migrations are generated and applied correctly
- All integration tests pass with real PostgreSQL (skips if Docker unavailable)
- No direct SQLAlchemy imports in domain layer
- Repository implements
RepositoryProtocolcorrectly
Anti-Patterns to Avoid
❌ Don't: Use raw SQL queries in business logic ✅ Do: Use repository methods and let SQLAlchemy handle queries
❌ Don't: Hardcode supported field types
✅ Do: Use FieldRegistry to allow plugins to add types (e.g. GeoLocation)
❌ Don't: Store metadata in database like Frappe ✅ Do: Generate tables from code-first Pydantic models
❌ Don't: Use synchronous database calls ✅ Do: Use async/await throughout
❌ Don't: Hardcode table names or column names ✅ Do: Derive from Pydantic model metadata