Skip to main content

Migration Guide: Frappe → Framework M

A practical guide for Frappe developers transitioning to Framework M. This covers common patterns and their Framework M equivalents.

Tone: Both frameworks are excellent. Frappe's simplicity enabled rapid development. Framework M builds on those foundations with explicit patterns for scale.

[!IMPORTANT] Documentation Status: This guide shows planned patterns and API designs. Examples will be revisited in Phase 09B with real, working code including actual container initialization, DI wiring, and tested implementations.


Quick Reference

Frappe PatternFramework M Pattern
frappe.db.get_value()repo.get() via DI
frappe.session.userAuthContext via DI
doc.before_save()Controller.before_save()
frappe.get_doc()repo.get(session, id)
Implicit transactionExplicit UnitOfWork
Global stateDependency Injection

1. Database Access

Frappe (Global State)

# Anywhere in code
value = frappe.db.get_value("Item", item_code, "stock_qty")
frappe.db.set_value("Item", item_code, "stock_qty", new_qty)

# Get document
doc = frappe.get_doc("Item", item_code)
doc.stock_qty = 100
doc.save()

Framework M (Explicit Session)

# Repository injected, session explicit
class ItemService:
def __init__(self, repo: GenericRepository[Item], uow_factory: UoWFactory):
self.repo = repo
self.uow_factory = uow_factory

async def update_stock(self, item_code: str, new_qty: int):
async with self.uow_factory() as uow:
item = await self.repo.get_by_name(uow.session, item_code)
item.stock_qty = new_qty
await self.repo.save(uow.session, item)
await uow.commit()

Why? Explicit session enables:

  • Multiple independent transactions per request
  • Easy testing (mock the session)
  • Clear transaction boundaries

2. Getting Current User

Frappe

current_user = frappe.session.user
if frappe.session.user == "Administrator":
# admin logic

Framework M

# Injected via Litestar DI
from litestar import Controller, get
from framework_m.core.interfaces import AuthContext

class ItemController(Controller):
@get("/items")
async def list_items(self, auth: AuthContext) -> list[Item]:
# auth.user_id, auth.roles available
if "Administrator" in auth.roles:
# admin logic
return await self.service.list_for_user(auth.user_id)

3. Document Lifecycle Hooks

Frappe (on DocType)

class Item(Document):
def before_save(self):
self.validate_qty()

def after_insert(self):
self.create_stock_entry()

Framework M (on Controller)

# doctype.py - schema only
class Item(BaseDocType):
item_code: str
stock_qty: int

# controller.py - business logic
class ItemController(BaseController[Item]):
async def validate(self, context=None):
if self.doc.stock_qty < 0:
raise ValueError("Stock cannot be negative")

async def after_insert(self, context=None):
await self.create_stock_entry()

Hook Order (Insert):

validate → before_insert → before_save → [DB INSERT] → after_save → after_insert

4. Child Tables

Frappe (Loop in Template)

{% for item in doc.items %}
<tr>
<td>{{ item.item_code }}</td>
<td>{{ item.qty }}</td>
</tr>
{% endfor %}
# Backend
for item in doc.items:
item.amount = item.qty * item.rate

Framework M + Refine.dev

// Frontend - Ant Design Table
<Table
dataSource={invoice.items}
columns={[
{ title: "Item", dataIndex: "item_code" },
{ title: "Qty", dataIndex: "qty" },
{ title: "Amount", dataIndex: "amount" },
]}
/>
# Backend - Pydantic child table
class InvoiceItem(BaseDocType):
class Meta:
is_child_table = True

item_code: str
qty: Decimal
rate: Decimal
amount: Decimal = Decimal("0")

class Invoice(BaseDocType):
items: list[InvoiceItem] = Field(default_factory=list)

# Controller - calculate amounts
class InvoiceController(BaseController[Invoice]):
async def before_save(self, context=None):
for item in self.doc.items:
item.amount = item.qty * item.rate
self.doc.total = sum(i.amount for i in self.doc.items)

5. Avoiding N+1 Queries

Frappe (Common N+1)

# N+1 problem: 1 query for invoices + N queries for customers
invoices = frappe.get_all("Invoice", fields=["name", "customer"])
for inv in invoices:
customer = frappe.get_doc("Customer", inv.customer) # N queries!
inv.customer_name = customer.customer_name

Framework M (Eager Loading)

# Single query with JOIN
async def list_invoices_with_customers(self, session: AsyncSession):
stmt = (
select(Invoice)
.options(joinedload(Invoice.customer)) # Eager load
.limit(100)
)
result = await session.execute(stmt)
return result.scalars().all()

Or use repository with includes:

# Repository pattern with includes
invoices = await repo.list(
session,
filters={"status": "Draft"},
includes=["customer", "items"], # Eager load relations
)

6. Transactions

Frappe (Implicit)

# Transaction per request, auto-commit on success
def create_order():
order = frappe.get_doc({"doctype": "Order", ...})
order.insert() # Auto-commits at request end

items = create_items(order.name) # Same transaction

Framework M (Explicit UoW)

async def create_order(self, data: OrderCreate):
async with self.uow_factory() as uow:
# All operations in same transaction
order = Order(**data.model_dump())
await self.order_repo.save(uow.session, order)

for item_data in data.items:
item = OrderItem(order_id=order.id, **item_data)
await self.item_repo.save(uow.session, item)

await uow.commit() # Explicit commit
# Auto-rollback if exception before commit

7. Background Jobs

Frappe

frappe.enqueue(
"myapp.tasks.send_email",
queue="short",
email=email,
subject=subject,
)

Framework M (JobQueueProtocol)

Framework M uses a JobQueueProtocol interface, allowing you to plug in any background job system (Celery, Arq, BullMQ). The default adapter uses Taskiq.

from framework_m.core.interfaces import JobQueueProtocol

# Injected dependency
class EmailService:
def __init__(self, queue: JobQueueProtocol):
self.queue = queue

async def send_welcome(self, email: str):
# Enqueue task (implementation agnostic)
await self.queue.enqueue(
"send_email_task",
email=email,
subject="Welcome"
)

# Defining the task (adapter specific - e.g., Taskiq)
from framework_m.adapters.jobs import task

@task
async def send_email_task(email: str, subject: str):
await smtp.send(email, subject)

8. Events / Hooks

Frappe (doc_events)

# hooks.py
doc_events = {
"Invoice": {
"on_submit": "myapp.invoice.on_submit",
}
}

Framework M (Event Bus)

from framework_m.core.interfaces import EventBusProtocol, Event

# Publish event
event = Event(type="invoice.submitted", data={"id": str(invoice.id)})
await event_bus.publish("invoice.submitted", event)

# Subscribe (in another service)
async def handle_invoice_submitted(event: Event) -> None:
await update_ledger(event.data["id"])

# Register handler on startup
await event_bus.subscribe("invoice.submitted", handle_invoice_submitted)

9. Permissions

Frappe

if frappe.has_permission("Invoice", "write", doc.name):
# allowed

Framework M

# Via PermissionProtocol (injected)
from framework_m.core.interfaces import PermissionProtocol, PolicyEvaluateRequest, PermissionAction

class InvoiceService:
def __init__(self, permissions: PermissionProtocol):
self.permissions = permissions

async def update(self, auth: AuthContext, invoice_id: str, data: dict):
result = await self.permissions.evaluate(PolicyEvaluateRequest(
principal=auth.user_id,
action=PermissionAction.WRITE,
resource="Invoice",
resource_id=invoice_id,
principal_attributes={"roles": auth.roles},
))
if not result.authorized:
raise PermissionDenied(result.reason)

Or declarative in Meta:

class Invoice(BaseDocType):
class Meta:
permissions = {
"read": ["Employee", "Manager"],
"write": ["Manager"],
"submit": ["Manager"],
}

10. API Endpoints

Frappe (whitelisted)

@frappe.whitelist()
def get_stock_balance(item_code):
return frappe.db.get_value("Item", item_code, "stock_qty")

Framework M (Litestar routes)

from litestar import Controller, get

class StockController(Controller):
path = "/api/stock"

@get("/{item_code:str}/balance")
async def get_balance(self, item_code: str, repo: ItemRepository) -> dict:
item = await repo.get_by_code(item_code)
return {"balance": item.stock_qty}

Or use RPC-style for Frappe compatibility:

# In my_app/api.py
from framework_m.core.decorators import rpc

@rpc() # Registers as "my_app.api.get_stock_balance"
async def get_stock_balance(item_code: str, repo: ItemRepository) -> dict:
item = await repo.get_by_code(item_code)
return {"balance": item.stock_qty}

# Exposed as: POST /api/v1/rpc/fn/my_app.api.get_stock_balance
# Body: {"item_code": "ITEM-001"}

Two RPC patterns available:

  • Controller methods: POST /api/v1/rpc/{DocType}/{method} - uses @whitelist on controller
  • Standalone functions: POST /api/v1/rpc/fn/{dotted.path} - uses @rpc decorator

11. Queries and Aggregations

Frappe

# Get all with filters
items = frappe.get_all("Item", filters={"item_group": "Electronics"}, fields=["name", "price"])

# Query Builder
from frappe.query_builder import DocType
Item = DocType("Item")
query = frappe.qb.from_(Item).select(Item.name, Item.price).where(Item.stock_qty > 0)

# Raw SQL
frappe.db.sql("SELECT name, price FROM tabItem WHERE stock_qty > %s", (0,), as_dict=True)

Framework M

from framework_m.core.interfaces import FilterSpec, FilterOperator

# Repository list with filters
items = await repo.list(
session,
filters=[FilterSpec(field="item_group", operator=FilterOperator.EQ, value="Electronics")],
limit=100,
)

# SQLAlchemy Core (for complex queries)
from sqlalchemy import select, func
stmt = select(Item.name, Item.price).where(Item.stock_qty > 0)
result = await session.execute(stmt)

# Aggregations
stmt = select(func.sum(Item.stock_qty)).where(Item.item_group == "Electronics")
total = (await session.execute(stmt)).scalar()

12. Print Formats

Frappe

# Jinja template in DocType
frappe.get_print("Invoice", invoice_name, print_format="Standard")

Framework M

# PrintProtocol for document rendering
from framework_m.core.interfaces import PrintProtocol, PrintFormat

class InvoiceService:
def __init__(self, printer: PrintProtocol, repo: RepositoryProtocol[Invoice]):
self.printer = printer
self.repo = repo

async def generate_pdf(self, invoice_id: str) -> bytes:
invoice = await self.repo.get(UUID(invoice_id))
return await self.printer.render(
doc=invoice,
template="standard",
format=PrintFormat.PDF,
)

async def list_templates(self) -> list[str]:
return await self.printer.get_templates("Invoice")

# Adapters: JinjaPrintAdapter (HTML), GotenbergAdapter (PDF via Gotenberg)

13. Workflow & Document States

[!NOTE] Planned Feature (Phase 08): Framework M will provide a pluggable WorkflowProtocol with internal and Temporal adapters. Workflows are defined as DocTypes, not hardcoded in controllers.

Frappe

# Workflow defined in UI, transitions via API
frappe.get_doc("Invoice", name).set_workflow_action("Approve")

Framework M (WorkflowProtocol)

# Workflow defined as DocType
# Workflow: "InvoiceApproval"
# - States: Draft, Pending Approval, Approved, Rejected
# - Transitions: Submit (Draft → Pending), Approve (Pending → Approved), etc.
# - allowed_roles per transition

# DocType references the workflow
class Invoice(BaseDocType):
class Meta:
workflow = "InvoiceApproval"

# Transitions via injected WorkflowProtocol
class InvoiceService:
def __init__(self, workflow: WorkflowProtocol):
self.workflow = workflow

async def approve(self, invoice_id: str, auth: AuthContext):
# Checks permissions, validates transition, updates state, emits event
await self.workflow.transition(invoice_id, "Approve", auth)

# Available adapters:
# - InternalWorkflowAdapter: Simple state machine, stores state in DB
# - TemporalWorkflowAdapter: Distributed, long-running workflows via Temporal.io

Why Protocol-based?

  • Swap implementations without changing business logic
  • Internal adapter for simple use cases
  • Temporal for complex, distributed workflows
  • Workflow definitions are data, not code

14. Naming Series

[!NOTE] Planned Feature (Phase 08): Declarative naming patterns via Meta.name_pattern. Uses optimistic generation with retry on conflict—no row-level locking.

Frappe

# In DocType, autoname = "naming_series:"
doc.naming_series = "INV-.YYYY.-"

Framework M (Declarative Naming)

class Invoice(BaseDocType):
class Meta:
# Declarative pattern - framework handles generation
name_pattern = "INV-.YYYY.-.####" # → INV-2026-0001

# Pattern tokens:
# .YYYY → Year (2026)
# .MM → Month (01-12)
# .DD → Day (01-31)
# .#### → Sequential number (optimistic, retry on conflict)
# {field} → Field value (e.g., {customer_code})

# For high-volume DocTypes, use database-native sequences:
class HighVolumeInvoice(BaseDocType):
class Meta:
name_pattern = "sequence:invoice_seq" # Uses DB sequence

# Framework detects database and uses appropriate mechanism:
# PostgreSQL: CREATE SEQUENCE + nextval()
# MySQL/MariaDB: AUTO_INCREMENT helper table
# SQLite: sqlite_sequence table
# Oracle: CREATE SEQUENCE
# MSSQL: CREATE SEQUENCE (2012+) or IDENTITY

Why this approach?

  • id (UUID) is the primary key—zero contention on insert
  • name is optional human-readable identifier
  • No SELECT ... FOR UPDATE lock on counters (Frappe anti-pattern)
  • Optimistic generation: insert → generate name → retry if conflict

15. Fixtures & Data Import

Frappe

// fixtures in hooks.py
fixtures = ["Custom Field", {"dt": "Role", "filters": [["name", "in", ["Manager"]]]}]

Framework M

# CLI command for fixtures
@app.command
async def load_fixtures(path: str = "fixtures/"):
"""Load JSON/YAML fixtures into database."""
async with uow_factory() as uow:
for file in Path(path).glob("*.json"):
data = json.loads(file.read_text())
for record in data:
await repo.save(uow.session, Model(**record))
await uow.commit()

16. Scheduled Jobs

Frappe

# hooks.py
scheduler_events = {
"daily": ["myapp.tasks.daily_cleanup"],
"cron": {"0 9 * * *": ["myapp.tasks.morning_report"]},
}

Framework M (JobQueueProtocol Scheduling)

Schedules are managed via the JobQueueProtocol or adapter-specific decorators.

# Programmatic scheduling via Protocol
await queue.schedule("morning_report", cron="0 9 * * *")

# Or Decorator-based (Taskiq Adapter)
from taskiq import cron

@broker.task
@cron("0 9 * * *")
async def morning_report():
await report_service.generate_daily()

@broker.task
@cron("0 0 * * *")
async def daily_cleanup():
await cleanup_service.run()

17. Cache

Frappe

# Get/set cache with TTL
frappe.cache().set_value("key", value, expires_in_sec=3600)
value = frappe.cache().get_value("key")

# Delete pattern
frappe.cache().delete_keys("user:*")

Framework M

from framework_m.core.interfaces import CacheProtocol

class ItemService:
def __init__(self, cache: CacheProtocol, repo: RepositoryProtocol[Item]):
self.cache = cache
self.repo = repo

async def get_item(self, item_id: str) -> Item:
# Try cache first
cached = await self.cache.get(f"item:{item_id}")
if cached:
return Item.model_validate(cached)

# Fetch from DB and cache
item = await self.repo.get(UUID(item_id))
await self.cache.set(f"item:{item_id}", item.model_dump(), ttl=3600)
return item

async def invalidate_item(self, item_id: str):
await self.cache.delete(f"item:{item_id}")

async def invalidate_all_items(self):
deleted = await self.cache.delete_pattern("item:*")
return deleted

# Adapters: RedisCacheAdapter, MemoryCacheAdapter (dev)

18. Audit Trail / Version History

Frappe

# Auto-tracks changes in Version doctype
doc.get_doc_before_save() # Compare changes

# Add comment
doc.add_comment("Updated stock levels")

# frappe.log_error for exceptions
frappe.log_error("Something went wrong")

Framework M

from framework_m.core.interfaces import AuditLogProtocol, AuditEntry

class InvoiceService:
def __init__(self, audit: AuditLogProtocol, repo: RepositoryProtocol[Invoice]):
self.audit = audit
self.repo = repo

async def update_invoice(self, invoice_id: str, data: dict, auth: AuthContext):
old = await self.repo.get(UUID(invoice_id))
# ... update logic ...
new = await self.repo.save(updated_invoice)

# Log the change with diff
await self.audit.log(
user_id=auth.user_id,
action="update",
doctype="Invoice",
document_id=invoice_id,
changes={"status": {"old": old.status, "new": new.status}},
metadata={"ip": auth.ip_address},
)

async def get_history(self, invoice_id: str) -> list[AuditEntry]:
return await self.audit.query(
filters={"doctype": "Invoice", "document_id": invoice_id},
limit=50,
)

# Adapters:
# - DatabaseAuditAdapter: Writes to ActivityLog table (Indie)
# - FileAuditAdapter: JSONL file for Splunk/Filebeat
# - ElasticAuditAdapter: Elasticsearch (Enterprise)

[!NOTE] Planned Feature (Phase 08): The SearchProtocol interface is defined, but the Meilisearch/Elasticsearch adapters are planned for Phase 08.

Frappe

# Link field search
frappe.get_list("Customer", filters={"name": ["like", "%search%"]})

# Global search
frappe.search_link("Customer", "john")

Framework M

from framework_m.core.interfaces import SearchProtocol, SearchResult

class CustomerService:
def __init__(self, search: SearchProtocol, repo: RepositoryProtocol[Customer]):
self.search = search
self.repo = repo

async def search_customers(self, query: str) -> SearchResult:
return await self.search.search(
doctype="Customer",
query=query,
limit=20,
)

async def index_customer(self, customer: Customer):
# Called after save to keep search index updated
await self.search.index(
doctype="Customer",
doc_id=str(customer.id),
doc=customer.model_dump(),
)

# Adapters: MeilisearchAdapter, ElasticsearchAdapter, WhooshAdapter (dev)

20. Real-time Notifications

[!NOTE] Planned Feature (Phase 08): The NotificationProtocol interface is defined, but WebSocket and Email adapters are planned for Phase 08.

Frappe

# Push to browser
frappe.publish_realtime("invoice_updated", {"id": doc.name}, user=user)

# Create notification
frappe.get_doc({
"doctype": "Notification Log",
"for_user": user,
"subject": "Invoice approved",
}).insert()

Framework M

from framework_m.core.interfaces import NotificationProtocol

class InvoiceService:
def __init__(self, notifications: NotificationProtocol):
self.notifications = notifications

async def approve_invoice(self, invoice_id: str, auth: AuthContext):
# ... approval logic ...

# In-app notification (stored in DB, shown in UI)
await self.notifications.send_in_app(
user_id=invoice.owner,
title="Invoice Approved",
body=f"Invoice {invoice.name} has been approved",
link=f"/app/Invoice/{invoice_id}",
)

# Or email notification
await self.notifications.send_email(
to=[invoice.owner_email],
subject="Invoice Approved",
body=f"Your invoice {invoice.name} has been approved.",
template="invoice_approved",
context={"invoice": invoice.model_dump()},
)

# WebSocket endpoint: WS /api/v1/notifications/stream
# Real-time push to connected clients via SocketProtocol

21. Virtual DocTypes (External Data Sources)

Frappe

# Virtual DocType - data from external API, not database
class ExternalProduct(Document):
def db_insert(self):
# Custom insert logic
api.create_product(self.as_dict())

def load_from_db(self):
data = api.get_product(self.name)
self.update(data)

Framework M

# Implement RepositoryProtocol for any data source
class ExternalProductRepository(RepositoryProtocol[Product]):
def __init__(self, api_client: APIClient):
self.api = api_client

async def get(self, session, id: UUID) -> Product | None:
data = await self.api.get_product(str(id))
return Product(**data) if data else None

async def save(self, session, entity: Product) -> Product:
await self.api.upsert_product(entity.model_dump())
return entity

# Register override in container
container.repository_factory.register_override("Product", ExternalProductRepository)

22. Custom Fields & Dynamic Meta

Frappe

# Get doctype meta with custom fields
meta = frappe.get_meta("Invoice")
for field in meta.fields:
print(field.fieldname, field.fieldtype)

# Check if field exists
if meta.has_field("custom_discount"):
pass

Framework M

# Meta endpoint returns full schema including custom fields
# GET /api/v1/meta/Invoice

# In code - MetaRegistry
from framework_m.core.registry import MetaRegistry

registry = MetaRegistry.get_instance()
meta = registry.get_doctype("Invoice")

for field in meta.fields:
print(field.name, field.type)

# Custom fields via DocType overrides (Phase 08)
class ExtendedInvoice(Invoice):
custom_discount: Decimal = Field(default=Decimal("0"))

registry.register_override("Invoice", ExtendedInvoice)

23. Currency & Number Formatting

Frappe

frappe.format_value(1234567.89, {"fieldtype": "Currency"})  # ₹12,34,567.89
frappe.format_value(0.15, {"fieldtype": "Percent"}) # 15%

Framework M

# Formatting via Python's babel library (locale-aware)
from babel.numbers import format_currency, format_percent

class InvoiceService:
def format_total(self, amount: Decimal, locale: str = "en_IN") -> str:
return format_currency(amount, "INR", locale=locale)
# → ₹12,34,567.89

def format_rate(self, rate: Decimal, locale: str = "en_IN") -> str:
return format_percent(rate, locale=locale)
# → 15%

# Frontend: use Intl API
# new Intl.NumberFormat("en-IN", { style: "currency", currency: "INR" }).format(1234567.89)

[!NOTE] I18nProtocol covers text translation only. Number/currency formatting uses Python's babel library directly or a separate FormatterProtocol (planned).


24. File Attachments

Frappe

# Attach file to document
file_doc = frappe.get_doc({
"doctype": "File",
"file_url": "/files/invoice.pdf",
"attached_to_doctype": "Invoice",
"attached_to_name": "INV-001",
})
file_doc.insert()

# Get attachments
files = frappe.get_all("File", filters={"attached_to_name": "INV-001"})

Framework M

# StorageProtocol for file operations
from framework_m.core.interfaces import StorageProtocol

class InvoiceService:
def __init__(self, storage: StorageProtocol, repo: RepositoryProtocol[Attachment]):
self.storage = storage
self.repo = repo

async def attach_file(self, invoice_id: str, file: UploadFile) -> str:
# Upload to S3/local/etc.
content = await file.read()
path = await self.storage.save_file(
path=f"invoices/{invoice_id}/{file.filename}",
content=content,
content_type=file.content_type,
)

# Store attachment reference
attachment = Attachment(
doctype="Invoice",
doc_id=invoice_id,
file_url=path,
filename=file.filename,
)
await self.repo.save(attachment)
return path

25. Email

Frappe

frappe.sendmail(
recipients=["user@example.com"],
subject="Invoice Ready",
template="invoice_notification",
args={"invoice": invoice.as_dict()},
)

Framework M

# EmailSenderProtocol for sending emails
from framework_m.core.interfaces import EmailSenderProtocol, EmailMessage
from jinja2 import Environment

class InvoiceService:
def __init__(self, email_sender: EmailSenderProtocol, jinja: Environment):
self.email_sender = email_sender
self.jinja = jinja

async def send_notification(self, invoice: Invoice):
template = self.jinja.get_template("invoice_notification.html")
html = template.render(invoice=invoice.model_dump())

message = EmailMessage(
to=["user@example.com"],
subject="Invoice Ready",
html_body=html,
)
result = await self.email_sender.send(message)
return result.success

# Adapters: SMTPEmailSender, LogEmailSender (dev), etc.

Frappe

# Get all linked documents
linked = frappe.get_all("Dynamic Link", filters={
"link_doctype": "Customer",
"link_name": "CUST-001",
})

# Dynamic Link field
class Contact(Document):
links = [] # Child table with link_doctype, link_name

Framework M

# Link fields with eager loading
class Contact(BaseDocType):
# Explicit links
customer: Link["Customer"] | None = None
supplier: Link["Supplier"] | None = None

# Or polymorphic links via child table
links: list[DynamicLink] = Field(default_factory=list)

class DynamicLink(BaseDocType):
class Meta:
is_child_table = True

link_doctype: str
link_id: UUID

# Query linked documents
async def get_customer_contacts(self, customer_id: str):
return await self.repo.list(
session,
filters=[FilterSpec("customer", FilterOperator.EQ, customer_id)],
includes=["customer"], # Eager load
)

27. Reports (Query Reports / Script Reports)

Frappe

# Script Report
def execute(filters):
columns = [{"fieldname": "name", "label": "Name", "fieldtype": "Data"}]
data = frappe.db.sql("""SELECT name FROM tabItem WHERE ...""", as_dict=True)
return columns, data

Framework M

# Report as a Service with typed response
class ItemReportService:
def __init__(self, repo: GenericRepository[Item]):
self.repo = repo

async def execute(self, filters: ItemReportFilters) -> ReportResult:
items = await self.repo.list(
session,
filters=filters.to_filter_specs(),
)
return ReportResult(
columns=[
Column(name="name", label="Name", type="string"),
Column(name="stock_qty", label="Stock", type="number"),
],
data=[item.model_dump() for item in items],
)

# Endpoint
@get("/api/reports/item-stock")
async def item_stock_report(
filters: ItemReportFilters,
report_service: ItemReportService,
) -> ReportResult:
return await report_service.execute(filters)

28. Single DocTypes (Settings)

Frappe

# Single DocType - only one record exists
settings = frappe.get_single("System Settings")
settings.enable_feature = True
settings.save()

Framework M

# Single DocType via Meta flag
class SystemSettings(BaseDocType):
class Meta:
is_single = True # Only one record

enable_feature: bool = False
default_currency: str = "INR"

# Service for settings
class SettingsService:
def __init__(self, repo: GenericRepository[SystemSettings]):
self.repo = repo
self._cache: SystemSettings | None = None

async def get(self, session) -> SystemSettings:
if self._cache is None:
self._cache = await self.repo.get_single(session)
return self._cache

async def update(self, session, data: dict) -> SystemSettings:
settings = await self.get(session)
for key, value in data.items():
setattr(settings, key, value)
await self.repo.save(session, settings)
self._cache = None # Invalidate cache
return settings

29. Multi-tenancy

Frappe

# Multi-site via bench
frappe.local.site # Current site
# Each site = separate database

# Multi-company within site
filters = {"company": frappe.defaults.get_user_default("Company")}

Framework M

# TenantContext injected via middleware
class ItemService:
async def list_items(self, tenant: TenantContext, session):
# Automatic tenant filtering via RLS or explicit filter
return await self.repo.list(
session,
filters=[FilterSpec("tenant_id", FilterOperator.EQ, tenant.id)],
)

# Tenancy modes (configured via adapter):
# - ImplicitTenantAdapter: Single tenant (indie deployments)
# - HeaderTenantAdapter: Multi-tenant via X-Tenant-ID header
# - DatabasePerTenantAdapter: Separate DB per tenant

# Frontend: tenant from JWT or API response
# Backend: automatic RLS filtering when enabled

See Phase 06 checklist for TenantProtocol details.

Part 2: Frontend Migration (Frappe UI → Refine)

Quick Reference

Frappe UIRefine + React
frappe.ui.form.FormuseForm() + RJSF
frappe.ListviewuseTable() + Table component
frappe.call()useCustom() or fetch()
cur_frmReact state / props
frappe.route_optionsURL params / React Router
frappe.msgprint()Toast / notification component
frappe.confirm()Modal / dialog component
frappe.ui.DialoguseModal()

30. Forms

Frappe

frappe.ui.form.on("Invoice", {
customer: function(frm) {
// Called when customer field changes
frappe.call({
method: "myapp.api.get_customer_address",
args: { customer: frm.doc.customer },
callback: (r) => frm.set_value("address", r.message)
});
},
refresh: function(frm) {
frm.add_custom_button("Send Email", () => { ... });
}
});

Refine + RJSF

import { useForm } from "@refinedev/react-hook-form";
import Form from "@rjsf/core";

export const InvoiceEdit = () => {
const { refineCore: { formLoading, onFinish }, watch, setValue } = useForm();
const customer = watch("customer");

// React to field changes
useEffect(() => {
if (customer) {
fetch(`/api/customer/${customer}/address`)
.then(r => r.json())
.then(data => setValue("address", data.address));
}
}, [customer]);

return (
<Form
schema={schema} // From GET /api/meta/Invoice
formData={formData}
onSubmit={onFinish}
/>
);
};

31. List Views

Frappe

frappe.listview_settings["Invoice"] = {
add_fields: ["status", "grand_total"],
get_indicator: (doc) => [doc.status, doc.status === "Paid" ? "green" : "orange"],
onload: (listview) => {
listview.page.add_action_item("Bulk Approve", () => { ... });
}
};

Refine + TanStack Table

import { useTable } from "@refinedev/react-table";
import { Table, Tag, Button } from "antd"; // or shadcn

export const InvoiceList = () => {
const { tableProps } = useTable({
resource: "Invoice",
columns: [
{ header: "Name", accessorKey: "name" },
{ header: "Status", accessorKey: "status",
cell: ({ value }) => (
<Tag color={value === "Paid" ? "green" : "orange"}>{value}</Tag>
)
},
{ header: "Total", accessorKey: "grand_total" },
],
});

return (
<>
<Button onClick={handleBulkApprove}>Bulk Approve</Button>
<Table {...tableProps} />
</>
);
};

32. RPC Calls

Frappe

frappe.call({
method: "myapp.api.get_stock_balance",
args: { item_code: "ITEM-001" },
callback: (r) => console.log(r.message)
});

Refine

import { useCustom } from "@refinedev/core";

const { data, isLoading } = useCustom({
url: "/api/method/get_stock_balance",
method: "post",
config: { payload: { item_code: "ITEM-001" } },
});

// Or with plain fetch
const response = await fetch("/api/method/get_stock_balance", {
method: "POST",
body: JSON.stringify({ item_code: "ITEM-001" }),
});

33. Dialogs and Modals

Frappe

frappe.prompt([
{ fieldname: "reason", label: "Reason", fieldtype: "Small Text" }
], (values) => {
frappe.call({ method: "cancel_invoice", args: { ...values } });
}, "Cancel Invoice");

Refine + shadcn

import { Dialog, DialogContent, DialogTrigger } from "@/components/ui/dialog";
import { useForm } from "react-hook-form";

export const CancelDialog = ({ invoiceId }) => {
const { register, handleSubmit } = useForm();

const onSubmit = async (data) => {
await fetch(`/api/invoice/${invoiceId}/cancel`, {
method: "POST",
body: JSON.stringify(data),
});
};

return (
<Dialog>
<DialogTrigger>Cancel Invoice</DialogTrigger>
<DialogContent>
<form onSubmit={handleSubmit(onSubmit)}>
<label>Reason</label>
<textarea {...register("reason")} />
<button type="submit">Confirm</button>
</form>
</DialogContent>
</Dialog>
);
};

34. Real-time Updates

Frappe

frappe.realtime.on("invoice_updated", (data) => {
cur_frm.reload_doc();
});

Refine (LiveProvider)

import { useSubscription } from "@refinedev/core";

// Subscribe to resource updates
useSubscription({
channel: "Invoice",
onLiveEvent: (event) => {
if (event.type === "updated") {
queryClient.invalidateQueries(["Invoice", event.payload.id]);
}
},
});

Minimal Patterns: Rapid Development in M

[!WARNING] Use Reasonably: These patterns bypass some standard architectural layers (Services, structured UoW) for speed. Ideal for prototypes, scripts, and simple CRUD. For complex business logic, prefer the full Service/Controller/Repository layers.

Philosophy: M's DI system works with function parameters, not just classes. You don't need service classes for simple cases, use DI directly.

The Spectrum of Complexity

Frappe-Like (Quick)                              Enterprise (Structured)
│ │
▼ ▼
Minimal DocType ─► RPC Function ─► Simple Service ─► Full Service Layer
(data only) (with DI) (single class) (repos, UoW, events)

Start minimal. Add structure when complexity demands it.


Pattern 1: Minimal DocType (No Controller)

Controllers are optional. For simple data storage, this is all you need:

# feedback.py - entire file
from framework_m import DocType, Field

class Feedback(DocType):
"""User feedback - just storage, no business logic."""
subject: str
message: str
rating: int = Field(ge=1, le=5, description="1-5 stars")
email: str | None = None

That's it. You get:

  • ✅ Database table created automatically
  • ✅ REST API: POST /api/v1/Feedback, GET /api/v1/Feedback, etc.
  • ✅ Validation (Pydantic enforces rating is 1-5)
  • ✅ JSON Schema for frontend forms

Frappe equivalent lines: ~15 (with naming, mandatory setup) M lines: 8


Pattern 2: RPC Function with DI (Frappe-style APIs)

For custom endpoints, use @rpc with injected dependencies directly in function params:

# stock_api.py
from decimal import Decimal
from dependency_injector.wiring import inject, Provide
from framework_m import rpc
from framework_m.core.container import Container
from framework_m.adapters.db import GenericRepository

@rpc("stock.get_balance")
@inject
async def get_stock_balance(
item_code: str,
warehouse: str,
repo: GenericRepository = Provide[Container.stock_ledger_repo],
) -> dict:
"""Get stock balance for item at warehouse."""
entries = await repo.list(
filters=[
("item_code", "=", item_code),
("warehouse", "=", warehouse),
]
)
total = sum(e.qty for e in entries.items)
return {"item_code": item_code, "warehouse": warehouse, "qty": total}

Called as: POST /api/v1/rpc/fn/stock.get_balance

{"item_code": "ITEM-001", "warehouse": "Main"}

No class needed. The @inject decorator handles DI in the function.


Pattern 3: Quick Validation (Controller Light)

Need just validation? Add a minimal controller:

# feedback.py
from framework_m import DocType, Field, BaseController

class Feedback(DocType):
subject: str
message: str
rating: int = Field(ge=1, le=5)
email: str | None = None

class FeedbackController(BaseController[Feedback]):
"""Only validate - no other hooks needed."""

async def validate(self, context=None) -> None:
if len(self.doc.message) < 10:
raise ValueError("Message must be at least 10 characters")

Only implement what you need. Empty hooks are inherited from BaseController.


Pattern 4: Direct Database Access (The "Hack")

For quick scripts or migrations, you can use the session directly:

from sqlalchemy import text
from framework_m import rpc
from dependency_injector.wiring import inject, Provide
from framework_m.core.container import Container
from framework_m.core.unit_of_work import UnitOfWork

@rpc("reports.sales_summary")
@inject
async def sales_summary(
year: int,
uow: UnitOfWork = Provide[Container.unit_of_work],
) -> list[dict]:
"""Raw SQL for complex reporting - this is the 'hack' path."""
async with uow:
result = await uow.session.execute(text("""
SELECT
customer,
SUM(grand_total) as total,
COUNT(*) as count
FROM invoice
WHERE EXTRACT(YEAR FROM posting_date) = :year
AND docstatus = 1
GROUP BY customer
ORDER BY total DESC
LIMIT 10
"""), {"year": year})
return [dict(row._mapping) for row in result]

[!WARNING] Raw SQL bypasses permissions, validation, and audit. Use for read-only reports or one-off scripts. For production business logic, use Repositories.


Pattern 5: Multiple RPCs, One File

Group related functions without classes:

# inventory_api.py
from framework_m import rpc
from dependency_injector.wiring import inject, Provide

@rpc("inventory.transfer")
@inject
async def transfer_stock(
item: str,
from_wh: str,
to_wh: str,
qty: float,
stock_svc = Provide[Container.stock_service],
) -> dict:
result = await stock_svc.transfer(item, from_wh, to_wh, qty)
return {"success": True, "entry_id": str(result.id)}

@rpc("inventory.adjust")
@inject
async def adjust_stock(
item: str,
warehouse: str,
qty: float,
reason: str,
stock_svc = Provide[Container.stock_service],
) -> dict:
result = await stock_svc.adjust(item, warehouse, qty, reason)
return {"success": True, "entry_id": str(result.id)}

@rpc("inventory.get_levels")
@inject
async def get_stock_levels(
warehouse: str | None = None,
repo = Provide[Container.item_repo],
) -> list[dict]:
filters = [("warehouse", "=", warehouse)] if warehouse else []
items = await repo.list(filters=filters, limit=1000)
return [{"item": i.item_code, "qty": i.qty} for i in items.items]

Three endpoints, no classes, full DI support.


Pattern 6: Background Job (Minimal)

# jobs/cleanup.py
from framework_m.adapters.jobs import task
from dependency_injector.wiring import inject, Provide

@task
@inject
async def cleanup_old_logs(
days: int = 30,
repo = Provide[Container.log_repo],
uow = Provide[Container.unit_of_work],
):
"""Delete logs older than N days."""
async with uow:
deleted = await repo.delete_where(
filters=[("created", "<", f"now() - interval '{days} days'")]
)
await uow.commit()
return {"deleted": deleted}

# Enqueue from anywhere:
# await cleanup_old_logs.kiq(days=7)

When to Add Structure

SituationPattern
Simple data storageMinimal DocType (no controller)
Custom API endpoint@rpc function with DI
Simple validationController with only validate()
Complex business logicService class
Multi-step transactionsService + UnitOfWork
External integrationsFull Protocol/Adapter

Comparison: Lines of Code

TaskFrappeM (Minimal)M (Full)
Simple DocType1588
DocType + validation201515
Custom API endpoint81225
Stock balance query51020
Background job101015

M is 30-50% more verbose for minimal cases, but the gap shrinks as complexity grows.


Summary: Key Mindset Shifts

FrappeFramework MWhy
Global frappe.dbInjected repoTestability, no hidden state
Implicit transactionsExplicit UoWClear boundaries, multiple txns
Hooks on DocTypeHooks on ControllerSeparation of data and logic
Request = transactionYou decideFlexibility for complex flows
frappe.call()Typed endpointsIDE support, validation
cur_frm (global)React state/propsComponent isolation
Jinja templatesReact componentsReusability, types
Server-rendered formsClient-side RJSFBetter UX, offline support

Next Steps

  1. Start with a simple DocType using m new:doctype
  2. Implement a Controller with hooks
  3. Use the Repository pattern for data access
  4. Add Refine.dev frontend for UI
  5. Connect forms via RJSF + metadata endpoint

See also: