Skip to main content

Phase 02: DocType Engine & Database

Objective: Implement the metadata engine that dynamically creates database tables from Pydantic models and provides generic CRUD operations.

[!IMPORTANT] Database Agnostic: All database implementations MUST work with both SQLite and PostgreSQL. Avoid database-specific features (e.g., PostgreSQL's ARRAY). Use portable SQL types that work across databases.


1. Schema Mapper (Pydantic → SQLAlchemy)

1.1 Field Registry & Type Mapping

  • Create src/framework_m/adapters/db/field_registry.py
  • Implement FieldRegistry class (Singleton):
    • Start with standard types: str, int, float, bool, datetime, date, decimal, uuid, json
    • Allow registration of custom types: register_type(pydantic_type, sqlalchemy_type)
  • Define type mapping using Registry:
    • strString
    • intInteger
    • floatFloat
    • boolBoolean
    • datetimeDateTime
    • dateDate
    • DecimalNumeric
    • UUIDUUID
    • list[str]JSON (portable, works on all databases)
    • dictJSON

1.2 Schema Mapper Implementation

TDD Focus: Write tests/adapters/db/test_schema_mapper.py first!

[!IMPORTANT] Primary Key Design Decision: Use id (UUID) as primary key, name as unique index.

  • Rationale: Allows document renames without cascading foreign key updates
  • Frappe Issue: Using name as PK causes expensive updates on rename
  • Standard Practice: All major databases use auto-generated id as PK
  • Flexibility: Framework users can rename documents cheaply
  • Create tests/adapters/db/test_schema_mapper.py

    • Test mapping simple fields (str, int)
    • Test mapping relationships
  • Create SchemaMapper class

  • Use FieldRegistry to look up SQLAlchemy types

  • Implement create_table(model: Type[BaseDocType]) -> Table:

    • Extract table name from model class name (lowercase)
    • Iterate over model.model_fields
    • Map each field to SQLAlchemy column
    • Handle Optional types (nullable=True)
    • Set id field as primary key (UUID, auto-generated)
    • Set name field as unique index (for human-readable lookup)
    • OCC: Add _version (Integer, default=0) if Meta.concurrency="optimistic"
    • Return SQLAlchemy Table object
  • Handle special field types:

    • Relationships (ForeignKey) - detect by field name pattern *_id
    • Enums - map to SQLAlchemy String type (database agnostic)
    • Nested Pydantic models (list[BaseModel]) - store as JSON
    • Child DocTypes (list[DocType]) - map to Relational Table with Foreign Key

1.3 Table Registry

  • Create TableRegistry class to store created tables
  • Add methods:
    • register_table(doctype_name: str, table: Table)
    • get_table(doctype_name: str) -> Table
    • table_exists(doctype_name: str) -> bool

2. Database Connection Setup

2.1 Connection Factory & Virtual DocTypes

  • Create src/framework_m/adapters/db/connection.py
  • Implement ConnectionFactory (supports Multiple Bindings):
    • Load db_binds from config (e.g., legacy, timescale)
    • Create generic async engine (supports Postgres, SQLite for testing)
    • Maintain map of engine_name -> AsyncEngine

2.2 Session Factory (SQL Support)

  • Add session factory:
    • async def get_session(bind: str = "default") -> AsyncSession
    • Support VirtualDocType (SQL Bind) by accepting bind_key from DocType Meta

2.3 Non-SQL Virtual DocTypes (Custom Repositories)

  • Create concept of RepositoryOverride:
    • Allow DocType to define repository_class in Meta
    • RepositoryFactory instantiates this instead of GenericRepository
  • Create tests/adapters/db/test_repository_factory.py:
    • Implement a Mock FileRepository that reads from JSON file
    • Register it for a VirtualDoc
    • Verify VirtualDoc.get() calls FileRepository.get()

3. Generic Repository Implementation

  • Create tests/adapters/db/test_generic_repository.py
  • Define tests for CRUD operations (mock AsyncSession)
  • Create src/framework_m/adapters/db/generic_repository.py
  • Implement GenericRepository[T] class
  • Constructor dependencies (session is NOT stored here):
    • model: Type[T]
    • table: Table
    • controller_class: Type[BaseController] | None
    • event_bus: EventBusProtocol | None (InMemoryEventBus for dev)

[!NOTE] All CRUD methods accept session: AsyncSession as the FIRST argument. The caller (Service/Controller) owns the session via UnitOfWork.

3.1 CRUD Operations

  • Implement async def get(session: AsyncSession, id: UUID) -> Optional[T]:

    • Build SELECT query
    • Filter deleted_at IS NULL (unless specific flag overrides)
    • Execute with provided session (not stored session)
    • Convert row to Pydantic model
    • Return None if not found
  • Implement async def get_by_name(session: AsyncSession, name: str) -> Optional[T]:

    • Helper for looking up by human-readable name
  • Implement async def save(session: AsyncSession, entity: T, version: int | None = None) -> T:

    • Check if entity exists (by id)
    • If new:
      • Generate id (UUIDv7)
      • Note: created_by / owner must be set by Controller/Service before calling save
      • Call controller validate()
      • Call controller before_create()
      • Call controller before_save()
      • Execute INSERT with provided session
      • Call controller after_save()
      • Call controller after_create()
      • Emit Event: event_bus.publish(f"{doctype}.create", payload)
    • If existing:
      • Note: modified_by must be set by Controller/Service before calling save
      • Call controller validate()
      • Call controller before_save()
      • OCC: If optimistic:
        • UPDATE table SET ..., _version=_version+1 WHERE id=:id AND _version=:old_ver
        • If rowcount == 0, raise VersionConflictError
      • Else: Execute standard UPDATE
      • Call controller after_save()
    • Emit Event: event_bus.publish(f"{doctype}.update", payload)
    • Return saved entity (caller calls uow.commit() when ready)
  • Implement async def delete(session: AsyncSession, id: UUID, hard: bool = False) -> None:

    • Load entity
    • Call controller before_delete()
    • If hard:
      • Execute DELETE
    • Else (Soft Delete):
      • Update deleted_at = now()
    • Call controller after_delete()
    • Emit Event: event_bus.publish(f"{doctype}.delete", payload)
    • Return (caller calls uow.commit() when ready)
  • Implement async def list(session: AsyncSession, filters, limit, offset, order_by) -> Sequence[T]:

    • Build SELECT query with filters
    • Default filter: deleted_at IS NULL
    • Apply pagination (limit, offset)
    • Apply sorting (order_by)
    • Execute query
    • Convert rows to Pydantic models
    • Return list

3.2 Lifecycle Hook Integration

  • Create helper method _call_hook(hook_name: str):
    • Check if controller exists
    • Check if hook method exists on controller
    • Call hook method if present
    • Handle exceptions and rollback on error

4. Migration System

4.1 Alembic Integration

  • Initialize Alembic in project:

    alembic init alembic

    Implemented via MigrationManager.init() which creates alembic directory structure

  • Configure alembic.ini:

    • Set database URL from environment
    • Configure migration file location
  • Create alembic/env.py:

    • Import MetaRegistry
    • Import all registered DocTypes
    • Set target_metadata from SchemaMapper tables Auto-generated with async support via MigrationManager.init()

4.2 Auto-Migration Detection

  • Create src/framework_m/adapters/db/migration.py

  • Implement detect_schema_changes():

    • Compare registered DocTypes with database schema
    • Detect new tables
    • Detect new columns
    • Detect type changes
    • Return list of changes
  • Implement auto_migrate():

    • Call detect_schema_changes()
    • Generate Alembic migration if changes detected
    • Apply migration automatically (dev mode only)

4.3 CLI Commands

  • Add migration commands to CLI:
    • m migrate - run pending migrations
    • m migrate:create <name> - create new migration
    • m migrate:rollback - rollback last migration
    • m migrate:status - show migration status Also added: m migrate:history, m migrate:init

5. Repository Factory

  • Create src/framework_m/adapters/db/repository_factory.py
  • Implement RepositoryFactory class:
    • create_generic_repository(doctype_name: str) -> GenericRepository
    • Look up DocType from MetaRegistry
    • Look up Table from TableRegistry
    • Look up Controller from MetaRegistry
    • Create and return GenericRepository instance
    • Support event_bus parameter for domain events
    • Support custom repository overrides for Virtual DocTypes

6. Engine, Session Factory & Unit of Work

[!IMPORTANT] Anti-Pattern to Avoid: Do NOT tie transaction lifecycle to HTTP request lifecycle. Repositories receive sessions; they do NOT create or own them.

6.1 Engine & Session Factory Setup

  • Create src/framework_m/adapters/db/connection.py (ConnectionFactory)
  • Implement create_engine(url: str) -> AsyncEngine
    • Pool configuration (pool_size, max_overflow, timeout, recycle, pre_ping)
    • Environment variable expansion (${VAR} syntax)
  • Implement SessionFactory (returns AsyncSession context managers)
    • Support multiple binds (for Virtual DocTypes with SQL binds)
    • Configuration from ConnectionFactory
    • Auto commit/rollback in context manager

6.2 Unit of Work (UnitOfWork)

  • Create tests/core/test_unit_of_work.py
    • Test: UoW provides session
    • Test: commit() persists changes
    • Test: Exception causes rollback (no explicit rollback call needed)
    • Test: Session is closed after __aexit__
  • Create src/framework_m/core/unit_of_work.py
  • Implement UnitOfWork context manager:
    • __init__(session_factory: Callable[[], AsyncSession])
    • session property with safety check
    • async __aenter__() creates session
    • async commit() calls session.commit()
    • async rollback() for explicit rollback
    • async __aexit__() with auto-rollback on exception
  • Register UnitOfWorkFactory in DI container

6.3 Multi-Source Coordination (Outbox Pattern)

  • Create src/framework_m/core/domain/outbox.py
  • Define OutboxEntry model:
    • id: UUID
    • target: str (e.g., "mongodb.audit_log", "api.payment_gateway")
    • payload: dict
    • status: str (pending, processed, failed)
    • created_at: datetime
    • processed_at: datetime | None
    • error_message: str | None
    • retry_count: int
  • Create OutboxRepository (SQL-backed) in adapters/db/outbox_repository.py
    • add(session, entry) - add entry in same transaction
    • get_pending(session, limit) - get pending entries
    • mark_processed(session, id) - mark as processed
    • mark_failed(session, id, error) - mark as failed
  • Document: Services write to Outbox in the same SQL transaction
  • (Phase 04) Background worker processes Outbox entries

6.4 Startup Sequence (Schema Sync)

  • Create src/framework_m/adapters/db/__init__.py
  • Implement startup sequence (called once at app boot, NOT per-request):
    • Initialize database engine (NOT session)
    • Discover all DocTypes via MetaRegistry
    • Create/sync tables via SchemaMapper
    • Register tables in TableRegistry
    • Run auto-migration (if enabled)

7. Testing

7.1 Unit Tests

  • Test SchemaMapper:

    • Test type mapping for all supported types (test_schema_mapper.py)
    • Test primary key creation (test_table_has_id_column_as_primary_key)
    • Test nullable fields (TestSchemaMapperNullableFields)
    • Test enum mapping (TestSchemaMapperEnums)
  • Test GenericRepository:

    • Test CRUD operations with mock data (test_generic_repository.py)
    • Test lifecycle hook calls (TestControllerHooks)
    • Test transaction rollback on error (TestTransactionRollback)

7.2 Integration Tests

  • Setup testcontainers for PostgreSQL (skips if Docker unavailable)

  • Create test DocType (IntegrationTestDoc / PostgresTestDoc)

  • Test full flow (SQLite & Postgres - Postgres skipped without Docker):

    • Register DocType (test_register_doctype)
    • Create table (test_table_created)
    • Insert document (test_insert_document)
    • Query document (test_query_document)
    • Update document (test_update_document)
    • Delete document (test_delete_document)
  • Test migration:

    • Add field to DocType
    • Run auto-migration
    • Verify column added to table

8. Error Handling

  • Create custom exceptions (core/exceptions.py):

    • DocTypeNotFoundError
    • ValidationError
    • PermissionDeniedError
    • DuplicateNameError
    • RepositoryError, EntityNotFoundError, DatabaseError, IntegrityError
  • Add error handling in repository (generic_repository.py):

    • Catch SQLAlchemy errors (IntegrityError, OperationalError, SQLAlchemyError)
    • Convert to domain exceptions (DuplicateNameError, DatabaseError, etc.)
    • Log errors with context (using logger.error with extra dict)

9. Performance Optimizations

  • Add query result caching (phase-04):

    • Cache get() results by ID
    • Invalidate cache on save/delete
    • Use Redis for distributed cache
  • Add bulk operations (generic_repository.py):

    • async def bulk_save(entities) - separates new/existing
    • async def _bulk_insert(entities) - true SQLAlchemy bulk insert
    • Uses insert().values([...]) for performance
  • Add query optimization (add indexes after the API layer exists):

    • Add indexes for common queries (schema_mapper: owner, creation, Meta.indexes)
    • Add parent index for child tables (efficient joins)
    • Use select_in_loading for relationships (load_children_for_parents method)
    • Add query logging in dev mode (logger.debug for GET/LIST)

Validation Checklist

Before moving to Phase 03, verify:

  • Can create tables from Pydantic models dynamically
  • CRUD operations work with lifecycle hooks
  • Migrations are generated and applied correctly
  • All integration tests pass with real PostgreSQL (skips if Docker unavailable)
  • No direct SQLAlchemy imports in domain layer
  • Repository implements RepositoryProtocol correctly

Anti-Patterns to Avoid

Don't: Use raw SQL queries in business logic ✅ Do: Use repository methods and let SQLAlchemy handle queries

Don't: Hardcode supported field types ✅ Do: Use FieldRegistry to allow plugins to add types (e.g. GeoLocation)

Don't: Store metadata in database like Frappe ✅ Do: Generate tables from code-first Pydantic models

Don't: Use synchronous database calls ✅ Do: Use async/await throughout

Don't: Hardcode table names or column names ✅ Do: Derive from Pydantic model metadata