Migration Guide: Frappe → Framework M
A practical guide for Frappe developers transitioning to Framework M. This covers common patterns and their Framework M equivalents.
Tone: Both frameworks are excellent. Frappe's simplicity enabled rapid development. Framework M builds on those foundations with explicit patterns for scale.
[!IMPORTANT] Documentation Status: This guide shows planned patterns and API designs. Examples will be revisited in Phase 09B with real, working code including actual container initialization, DI wiring, and tested implementations.
Quick Reference
| Frappe Pattern | Framework M Pattern |
|---|---|
frappe.db.get_value() | repo.get() via DI |
frappe.session.user | AuthContext via DI |
doc.before_save() | Controller.before_save() |
frappe.get_doc() | repo.get(session, id) |
| Implicit transaction | Explicit UnitOfWork |
| Global state | Dependency Injection |
1. Database Access
Frappe (Global State)
# Anywhere in code
value = frappe.db.get_value("Item", item_code, "stock_qty")
frappe.db.set_value("Item", item_code, "stock_qty", new_qty)
# Get document
doc = frappe.get_doc("Item", item_code)
doc.stock_qty = 100
doc.save()
Framework M (Explicit Session)
# Repository injected, session explicit
class ItemService:
def __init__(self, repo: GenericRepository[Item], uow_factory: UoWFactory):
self.repo = repo
self.uow_factory = uow_factory
async def update_stock(self, item_code: str, new_qty: int):
async with self.uow_factory() as uow:
item = await self.repo.get_by_name(uow.session, item_code)
item.stock_qty = new_qty
await self.repo.save(uow.session, item)
await uow.commit()
Why? Explicit session enables:
- Multiple independent transactions per request
- Easy testing (mock the session)
- Clear transaction boundaries
2. Getting Current User
Frappe
current_user = frappe.session.user
if frappe.session.user == "Administrator":
# admin logic
Framework M
# Injected via Litestar DI
from litestar import Controller, get
from framework_m.core.interfaces import AuthContext
class ItemController(Controller):
@get("/items")
async def list_items(self, auth: AuthContext) -> list[Item]:
# auth.user_id, auth.roles available
if "Administrator" in auth.roles:
# admin logic
return await self.service.list_for_user(auth.user_id)
3. Document Lifecycle Hooks
Frappe (on DocType)
class Item(Document):
def before_save(self):
self.validate_qty()
def after_insert(self):
self.create_stock_entry()
Framework M (on Controller)
# doctype.py - schema only
class Item(BaseDocType):
item_code: str
stock_qty: int
# controller.py - business logic
class ItemController(BaseController[Item]):
async def validate(self, context=None):
if self.doc.stock_qty < 0:
raise ValueError("Stock cannot be negative")
async def after_insert(self, context=None):
await self.create_stock_entry()
Hook Order (Insert):
validate → before_insert → before_save → [DB INSERT] → after_save → after_insert
4. Child Tables
Frappe (Loop in Template)
{% for item in doc.items %}
<tr>
<td>{{ item.item_code }}</td>
<td>{{ item.qty }}</td>
</tr>
{% endfor %}
# Backend
for item in doc.items:
item.amount = item.qty * item.rate
Framework M + Refine.dev
// Frontend - Ant Design Table
<Table
dataSource={invoice.items}
columns={[
{ title: "Item", dataIndex: "item_code" },
{ title: "Qty", dataIndex: "qty" },
{ title: "Amount", dataIndex: "amount" },
]}
/>
# Backend - Pydantic child table
class InvoiceItem(BaseDocType):
class Meta:
is_child_table = True
item_code: str
qty: Decimal
rate: Decimal
amount: Decimal = Decimal("0")
class Invoice(BaseDocType):
items: list[InvoiceItem] = Field(default_factory=list)
# Controller - calculate amounts
class InvoiceController(BaseController[Invoice]):
async def before_save(self, context=None):
for item in self.doc.items:
item.amount = item.qty * item.rate
self.doc.total = sum(i.amount for i in self.doc.items)
5. Avoiding N+1 Queries
Frappe (Common N+1)
# N+1 problem: 1 query for invoices + N queries for customers
invoices = frappe.get_all("Invoice", fields=["name", "customer"])
for inv in invoices:
customer = frappe.get_doc("Customer", inv.customer) # N queries!
inv.customer_name = customer.customer_name
Framework M (Eager Loading)
# Single query with JOIN
async def list_invoices_with_customers(self, session: AsyncSession):
stmt = (
select(Invoice)
.options(joinedload(Invoice.customer)) # Eager load
.limit(100)
)
result = await session.execute(stmt)
return result.scalars().all()
Or use repository with includes:
# Repository pattern with includes
invoices = await repo.list(
session,
filters={"status": "Draft"},
includes=["customer", "items"], # Eager load relations
)
6. Transactions
Frappe (Implicit)
# Transaction per request, auto-commit on success
def create_order():
order = frappe.get_doc({"doctype": "Order", ...})
order.insert() # Auto-commits at request end
items = create_items(order.name) # Same transaction
Framework M (Explicit UoW)
async def create_order(self, data: OrderCreate):
async with self.uow_factory() as uow:
# All operations in same transaction
order = Order(**data.model_dump())
await self.order_repo.save(uow.session, order)
for item_data in data.items:
item = OrderItem(order_id=order.id, **item_data)
await self.item_repo.save(uow.session, item)
await uow.commit() # Explicit commit
# Auto-rollback if exception before commit
7. Background Jobs
Frappe
frappe.enqueue(
"myapp.tasks.send_email",
queue="short",
email=email,
subject=subject,
)
Framework M (JobQueueProtocol)
Framework M uses a JobQueueProtocol interface, allowing you to plug in any background job system (Celery, Arq, BullMQ). The default adapter uses Taskiq.
from framework_m.core.interfaces import JobQueueProtocol
# Injected dependency
class EmailService:
def __init__(self, queue: JobQueueProtocol):
self.queue = queue
async def send_welcome(self, email: str):
# Enqueue task (implementation agnostic)
await self.queue.enqueue(
"send_email_task",
email=email,
subject="Welcome"
)
# Defining the task (adapter specific - e.g., Taskiq)
from framework_m.adapters.jobs import task
@task
async def send_email_task(email: str, subject: str):
await smtp.send(email, subject)
8. Events / Hooks
Frappe (doc_events)
# hooks.py
doc_events = {
"Invoice": {
"on_submit": "myapp.invoice.on_submit",
}
}
Framework M (Event Bus)
from framework_m.core.interfaces import EventBusProtocol, Event
# Publish event
event = Event(type="invoice.submitted", data={"id": str(invoice.id)})
await event_bus.publish("invoice.submitted", event)
# Subscribe (in another service)
async def handle_invoice_submitted(event: Event) -> None:
await update_ledger(event.data["id"])
# Register handler on startup
await event_bus.subscribe("invoice.submitted", handle_invoice_submitted)
9. Permissions
Frappe
if frappe.has_permission("Invoice", "write", doc.name):
# allowed
Framework M
# Via PermissionProtocol (injected)
from framework_m.core.interfaces import PermissionProtocol, PolicyEvaluateRequest, PermissionAction
class InvoiceService:
def __init__(self, permissions: PermissionProtocol):
self.permissions = permissions
async def update(self, auth: AuthContext, invoice_id: str, data: dict):
result = await self.permissions.evaluate(PolicyEvaluateRequest(
principal=auth.user_id,
action=PermissionAction.WRITE,
resource="Invoice",
resource_id=invoice_id,
principal_attributes={"roles": auth.roles},
))
if not result.authorized:
raise PermissionDenied(result.reason)
Or declarative in Meta:
class Invoice(BaseDocType):
class Meta:
permissions = {
"read": ["Employee", "Manager"],
"write": ["Manager"],
"submit": ["Manager"],
}
10. API Endpoints
Frappe (whitelisted)
@frappe.whitelist()
def get_stock_balance(item_code):
return frappe.db.get_value("Item", item_code, "stock_qty")
Framework M (Litestar routes)
from litestar import Controller, get
class StockController(Controller):
path = "/api/stock"
@get("/{item_code:str}/balance")
async def get_balance(self, item_code: str, repo: ItemRepository) -> dict:
item = await repo.get_by_code(item_code)
return {"balance": item.stock_qty}
Or use RPC-style for Frappe compatibility:
# In my_app/api.py
from framework_m.core.decorators import rpc
@rpc() # Registers as "my_app.api.get_stock_balance"
async def get_stock_balance(item_code: str, repo: ItemRepository) -> dict:
item = await repo.get_by_code(item_code)
return {"balance": item.stock_qty}
# Exposed as: POST /api/v1/rpc/fn/my_app.api.get_stock_balance
# Body: {"item_code": "ITEM-001"}
Two RPC patterns available:
- Controller methods:
POST /api/v1/rpc/{DocType}/{method}- uses@whiteliston controller - Standalone functions:
POST /api/v1/rpc/fn/{dotted.path}- uses@rpcdecorator
11. Queries and Aggregations
Frappe
# Get all with filters
items = frappe.get_all("Item", filters={"item_group": "Electronics"}, fields=["name", "price"])
# Query Builder
from frappe.query_builder import DocType
Item = DocType("Item")
query = frappe.qb.from_(Item).select(Item.name, Item.price).where(Item.stock_qty > 0)
# Raw SQL
frappe.db.sql("SELECT name, price FROM tabItem WHERE stock_qty > %s", (0,), as_dict=True)
Framework M
from framework_m.core.interfaces import FilterSpec, FilterOperator
# Repository list with filters
items = await repo.list(
session,
filters=[FilterSpec(field="item_group", operator=FilterOperator.EQ, value="Electronics")],
limit=100,
)
# SQLAlchemy Core (for complex queries)
from sqlalchemy import select, func
stmt = select(Item.name, Item.price).where(Item.stock_qty > 0)
result = await session.execute(stmt)
# Aggregations
stmt = select(func.sum(Item.stock_qty)).where(Item.item_group == "Electronics")
total = (await session.execute(stmt)).scalar()
12. Print Formats
Frappe
# Jinja template in DocType
frappe.get_print("Invoice", invoice_name, print_format="Standard")
Framework M
# PrintProtocol for document rendering
from framework_m.core.interfaces import PrintProtocol, PrintFormat
class InvoiceService:
def __init__(self, printer: PrintProtocol, repo: RepositoryProtocol[Invoice]):
self.printer = printer
self.repo = repo
async def generate_pdf(self, invoice_id: str) -> bytes:
invoice = await self.repo.get(UUID(invoice_id))
return await self.printer.render(
doc=invoice,
template="standard",
format=PrintFormat.PDF,
)
async def list_templates(self) -> list[str]:
return await self.printer.get_templates("Invoice")
# Adapters: JinjaPrintAdapter (HTML), GotenbergAdapter (PDF via Gotenberg)
13. Workflow & Document States
[!NOTE] Planned Feature (Phase 08): Framework M will provide a pluggable
WorkflowProtocolwith internal and Temporal adapters. Workflows are defined as DocTypes, not hardcoded in controllers.
Frappe
# Workflow defined in UI, transitions via API
frappe.get_doc("Invoice", name).set_workflow_action("Approve")
Framework M (WorkflowProtocol)
# Workflow defined as DocType
# Workflow: "InvoiceApproval"
# - States: Draft, Pending Approval, Approved, Rejected
# - Transitions: Submit (Draft → Pending), Approve (Pending → Approved), etc.
# - allowed_roles per transition
# DocType references the workflow
class Invoice(BaseDocType):
class Meta:
workflow = "InvoiceApproval"
# Transitions via injected WorkflowProtocol
class InvoiceService:
def __init__(self, workflow: WorkflowProtocol):
self.workflow = workflow
async def approve(self, invoice_id: str, auth: AuthContext):
# Checks permissions, validates transition, updates state, emits event
await self.workflow.transition(invoice_id, "Approve", auth)
# Available adapters:
# - InternalWorkflowAdapter: Simple state machine, stores state in DB
# - TemporalWorkflowAdapter: Distributed, long-running workflows via Temporal.io
Why Protocol-based?
- Swap implementations without changing business logic
- Internal adapter for simple use cases
- Temporal for complex, distributed workflows
- Workflow definitions are data, not code
14. Naming Series
[!NOTE] Planned Feature (Phase 08): Declarative naming patterns via
Meta.name_pattern. Uses optimistic generation with retry on conflict—no row-level locking.
Frappe
# In DocType, autoname = "naming_series:"
doc.naming_series = "INV-.YYYY.-"
Framework M (Declarative Naming)
class Invoice(BaseDocType):
class Meta:
# Declarative pattern - framework handles generation
name_pattern = "INV-.YYYY.-.####" # → INV-2026-0001
# Pattern tokens:
# .YYYY → Year (2026)
# .MM → Month (01-12)
# .DD → Day (01-31)
# .#### → Sequential number (optimistic, retry on conflict)
# {field} → Field value (e.g., {customer_code})
# For high-volume DocTypes, use database-native sequences:
class HighVolumeInvoice(BaseDocType):
class Meta:
name_pattern = "sequence:invoice_seq" # Uses DB sequence
# Framework detects database and uses appropriate mechanism:
# PostgreSQL: CREATE SEQUENCE + nextval()
# MySQL/MariaDB: AUTO_INCREMENT helper table
# SQLite: sqlite_sequence table
# Oracle: CREATE SEQUENCE
# MSSQL: CREATE SEQUENCE (2012+) or IDENTITY
Why this approach?
id(UUID) is the primary key—zero contention on insertnameis optional human-readable identifier- No
SELECT ... FOR UPDATElock on counters (Frappe anti-pattern) - Optimistic generation: insert → generate name → retry if conflict
15. Fixtures & Data Import
Frappe
// fixtures in hooks.py
fixtures = ["Custom Field", {"dt": "Role", "filters": [["name", "in", ["Manager"]]]}]
Framework M
# CLI command for fixtures
@app.command
async def load_fixtures(path: str = "fixtures/"):
"""Load JSON/YAML fixtures into database."""
async with uow_factory() as uow:
for file in Path(path).glob("*.json"):
data = json.loads(file.read_text())
for record in data:
await repo.save(uow.session, Model(**record))
await uow.commit()
16. Scheduled Jobs
Frappe
# hooks.py
scheduler_events = {
"daily": ["myapp.tasks.daily_cleanup"],
"cron": {"0 9 * * *": ["myapp.tasks.morning_report"]},
}
Framework M (JobQueueProtocol Scheduling)
Schedules are managed via the JobQueueProtocol or adapter-specific decorators.
# Programmatic scheduling via Protocol
await queue.schedule("morning_report", cron="0 9 * * *")
# Or Decorator-based (Taskiq Adapter)
from taskiq import cron
@broker.task
@cron("0 9 * * *")
async def morning_report():
await report_service.generate_daily()
@broker.task
@cron("0 0 * * *")
async def daily_cleanup():
await cleanup_service.run()
17. Cache
Frappe
# Get/set cache with TTL
frappe.cache().set_value("key", value, expires_in_sec=3600)
value = frappe.cache().get_value("key")
# Delete pattern
frappe.cache().delete_keys("user:*")
Framework M
from framework_m.core.interfaces import CacheProtocol
class ItemService:
def __init__(self, cache: CacheProtocol, repo: RepositoryProtocol[Item]):
self.cache = cache
self.repo = repo
async def get_item(self, item_id: str) -> Item:
# Try cache first
cached = await self.cache.get(f"item:{item_id}")
if cached:
return Item.model_validate(cached)
# Fetch from DB and cache
item = await self.repo.get(UUID(item_id))
await self.cache.set(f"item:{item_id}", item.model_dump(), ttl=3600)
return item
async def invalidate_item(self, item_id: str):
await self.cache.delete(f"item:{item_id}")
async def invalidate_all_items(self):
deleted = await self.cache.delete_pattern("item:*")
return deleted
# Adapters: RedisCacheAdapter, MemoryCacheAdapter (dev)
18. Audit Trail / Version History
Frappe
# Auto-tracks changes in Version doctype
doc.get_doc_before_save() # Compare changes
# Add comment
doc.add_comment("Updated stock levels")
# frappe.log_error for exceptions
frappe.log_error("Something went wrong")
Framework M
from framework_m.core.interfaces import AuditLogProtocol, AuditEntry
class InvoiceService:
def __init__(self, audit: AuditLogProtocol, repo: RepositoryProtocol[Invoice]):
self.audit = audit
self.repo = repo
async def update_invoice(self, invoice_id: str, data: dict, auth: AuthContext):
old = await self.repo.get(UUID(invoice_id))
# ... update logic ...
new = await self.repo.save(updated_invoice)
# Log the change with diff
await self.audit.log(
user_id=auth.user_id,
action="update",
doctype="Invoice",
document_id=invoice_id,
changes={"status": {"old": old.status, "new": new.status}},
metadata={"ip": auth.ip_address},
)
async def get_history(self, invoice_id: str) -> list[AuditEntry]:
return await self.audit.query(
filters={"doctype": "Invoice", "document_id": invoice_id},
limit=50,
)
# Adapters:
# - DatabaseAuditAdapter: Writes to ActivityLog table (Indie)
# - FileAuditAdapter: JSONL file for Splunk/Filebeat
# - ElasticAuditAdapter: Elasticsearch (Enterprise)
19. Search
[!NOTE] Planned Feature (Phase 08): The
SearchProtocolinterface is defined, but the Meilisearch/Elasticsearch adapters are planned for Phase 08.
Frappe
# Link field search
frappe.get_list("Customer", filters={"name": ["like", "%search%"]})
# Global search
frappe.search_link("Customer", "john")
Framework M
from framework_m.core.interfaces import SearchProtocol, SearchResult
class CustomerService:
def __init__(self, search: SearchProtocol, repo: RepositoryProtocol[Customer]):
self.search = search
self.repo = repo
async def search_customers(self, query: str) -> SearchResult:
return await self.search.search(
doctype="Customer",
query=query,
limit=20,
)
async def index_customer(self, customer: Customer):
# Called after save to keep search index updated
await self.search.index(
doctype="Customer",
doc_id=str(customer.id),
doc=customer.model_dump(),
)
# Adapters: MeilisearchAdapter, ElasticsearchAdapter, WhooshAdapter (dev)
20. Real-time Notifications
[!NOTE] Planned Feature (Phase 08): The
NotificationProtocolinterface is defined, but WebSocket and Email adapters are planned for Phase 08.
Frappe
# Push to browser
frappe.publish_realtime("invoice_updated", {"id": doc.name}, user=user)
# Create notification
frappe.get_doc({
"doctype": "Notification Log",
"for_user": user,
"subject": "Invoice approved",
}).insert()
Framework M
from framework_m.core.interfaces import NotificationProtocol
class InvoiceService:
def __init__(self, notifications: NotificationProtocol):
self.notifications = notifications
async def approve_invoice(self, invoice_id: str, auth: AuthContext):
# ... approval logic ...
# In-app notification (stored in DB, shown in UI)
await self.notifications.send_in_app(
user_id=invoice.owner,
title="Invoice Approved",
body=f"Invoice {invoice.name} has been approved",
link=f"/app/Invoice/{invoice_id}",
)
# Or email notification
await self.notifications.send_email(
to=[invoice.owner_email],
subject="Invoice Approved",
body=f"Your invoice {invoice.name} has been approved.",
template="invoice_approved",
context={"invoice": invoice.model_dump()},
)
# WebSocket endpoint: WS /api/v1/notifications/stream
# Real-time push to connected clients via SocketProtocol
21. Virtual DocTypes (External Data Sources)
Frappe
# Virtual DocType - data from external API, not database
class ExternalProduct(Document):
def db_insert(self):
# Custom insert logic
api.create_product(self.as_dict())
def load_from_db(self):
data = api.get_product(self.name)
self.update(data)
Framework M
# Implement RepositoryProtocol for any data source
class ExternalProductRepository(RepositoryProtocol[Product]):
def __init__(self, api_client: APIClient):
self.api = api_client
async def get(self, session, id: UUID) -> Product | None:
data = await self.api.get_product(str(id))
return Product(**data) if data else None
async def save(self, session, entity: Product) -> Product:
await self.api.upsert_product(entity.model_dump())
return entity
# Register override in container
container.repository_factory.register_override("Product", ExternalProductRepository)
22. Custom Fields & Dynamic Meta
Frappe
# Get doctype meta with custom fields
meta = frappe.get_meta("Invoice")
for field in meta.fields:
print(field.fieldname, field.fieldtype)
# Check if field exists
if meta.has_field("custom_discount"):
pass
Framework M
# Meta endpoint returns full schema including custom fields
# GET /api/v1/meta/Invoice
# In code - MetaRegistry
from framework_m.core.registry import MetaRegistry
registry = MetaRegistry.get_instance()
meta = registry.get_doctype("Invoice")
for field in meta.fields:
print(field.name, field.type)
# Custom fields via DocType overrides (Phase 08)
class ExtendedInvoice(Invoice):
custom_discount: Decimal = Field(default=Decimal("0"))
registry.register_override("Invoice", ExtendedInvoice)
23. Currency & Number Formatting
Frappe
frappe.format_value(1234567.89, {"fieldtype": "Currency"}) # ₹12,34,567.89
frappe.format_value(0.15, {"fieldtype": "Percent"}) # 15%
Framework M
# Formatting via Python's babel library (locale-aware)
from babel.numbers import format_currency, format_percent
class InvoiceService:
def format_total(self, amount: Decimal, locale: str = "en_IN") -> str:
return format_currency(amount, "INR", locale=locale)
# → ₹12,34,567.89
def format_rate(self, rate: Decimal, locale: str = "en_IN") -> str:
return format_percent(rate, locale=locale)
# → 15%
# Frontend: use Intl API
# new Intl.NumberFormat("en-IN", { style: "currency", currency: "INR" }).format(1234567.89)
[!NOTE] I18nProtocol covers text translation only. Number/currency formatting uses Python's
babellibrary directly or a separateFormatterProtocol(planned).
24. File Attachments
Frappe
# Attach file to document
file_doc = frappe.get_doc({
"doctype": "File",
"file_url": "/files/invoice.pdf",
"attached_to_doctype": "Invoice",
"attached_to_name": "INV-001",
})
file_doc.insert()
# Get attachments
files = frappe.get_all("File", filters={"attached_to_name": "INV-001"})
Framework M
# StorageProtocol for file operations
from framework_m.core.interfaces import StorageProtocol
class InvoiceService:
def __init__(self, storage: StorageProtocol, repo: RepositoryProtocol[Attachment]):
self.storage = storage
self.repo = repo
async def attach_file(self, invoice_id: str, file: UploadFile) -> str:
# Upload to S3/local/etc.
content = await file.read()
path = await self.storage.save_file(
path=f"invoices/{invoice_id}/{file.filename}",
content=content,
content_type=file.content_type,
)
# Store attachment reference
attachment = Attachment(
doctype="Invoice",
doc_id=invoice_id,
file_url=path,
filename=file.filename,
)
await self.repo.save(attachment)
return path
25. Email
Frappe
frappe.sendmail(
recipients=["user@example.com"],
subject="Invoice Ready",
template="invoice_notification",
args={"invoice": invoice.as_dict()},
)
Framework M
# EmailSenderProtocol for sending emails
from framework_m.core.interfaces import EmailSenderProtocol, EmailMessage
from jinja2 import Environment
class InvoiceService:
def __init__(self, email_sender: EmailSenderProtocol, jinja: Environment):
self.email_sender = email_sender
self.jinja = jinja
async def send_notification(self, invoice: Invoice):
template = self.jinja.get_template("invoice_notification.html")
html = template.render(invoice=invoice.model_dump())
message = EmailMessage(
to=["user@example.com"],
subject="Invoice Ready",
html_body=html,
)
result = await self.email_sender.send(message)
return result.success
# Adapters: SMTPEmailSender, LogEmailSender (dev), etc.
26. Linked Documents & Dynamic Links
Frappe
# Get all linked documents
linked = frappe.get_all("Dynamic Link", filters={
"link_doctype": "Customer",
"link_name": "CUST-001",
})
# Dynamic Link field
class Contact(Document):
links = [] # Child table with link_doctype, link_name
Framework M
# Link fields with eager loading
class Contact(BaseDocType):
# Explicit links
customer: Link["Customer"] | None = None
supplier: Link["Supplier"] | None = None
# Or polymorphic links via child table
links: list[DynamicLink] = Field(default_factory=list)
class DynamicLink(BaseDocType):
class Meta:
is_child_table = True
link_doctype: str
link_id: UUID
# Query linked documents
async def get_customer_contacts(self, customer_id: str):
return await self.repo.list(
session,
filters=[FilterSpec("customer", FilterOperator.EQ, customer_id)],
includes=["customer"], # Eager load
)
27. Reports (Query Reports / Script Reports)
Frappe
# Script Report
def execute(filters):
columns = [{"fieldname": "name", "label": "Name", "fieldtype": "Data"}]
data = frappe.db.sql("""SELECT name FROM tabItem WHERE ...""", as_dict=True)
return columns, data
Framework M
# Report as a Service with typed response
class ItemReportService:
def __init__(self, repo: GenericRepository[Item]):
self.repo = repo
async def execute(self, filters: ItemReportFilters) -> ReportResult:
items = await self.repo.list(
session,
filters=filters.to_filter_specs(),
)
return ReportResult(
columns=[
Column(name="name", label="Name", type="string"),
Column(name="stock_qty", label="Stock", type="number"),
],
data=[item.model_dump() for item in items],
)
# Endpoint
@get("/api/reports/item-stock")
async def item_stock_report(
filters: ItemReportFilters,
report_service: ItemReportService,
) -> ReportResult:
return await report_service.execute(filters)
28. Single DocTypes (Settings)
Frappe
# Single DocType - only one record exists
settings = frappe.get_single("System Settings")
settings.enable_feature = True
settings.save()
Framework M
# Single DocType via Meta flag
class SystemSettings(BaseDocType):
class Meta:
is_single = True # Only one record
enable_feature: bool = False
default_currency: str = "INR"
# Service for settings
class SettingsService:
def __init__(self, repo: GenericRepository[SystemSettings]):
self.repo = repo
self._cache: SystemSettings | None = None
async def get(self, session) -> SystemSettings:
if self._cache is None:
self._cache = await self.repo.get_single(session)
return self._cache
async def update(self, session, data: dict) -> SystemSettings:
settings = await self.get(session)
for key, value in data.items():
setattr(settings, key, value)
await self.repo.save(session, settings)
self._cache = None # Invalidate cache
return settings
29. Multi-tenancy
Frappe
# Multi-site via bench
frappe.local.site # Current site
# Each site = separate database
# Multi-company within site
filters = {"company": frappe.defaults.get_user_default("Company")}
Framework M
# TenantContext injected via middleware
class ItemService:
async def list_items(self, tenant: TenantContext, session):
# Automatic tenant filtering via RLS or explicit filter
return await self.repo.list(
session,
filters=[FilterSpec("tenant_id", FilterOperator.EQ, tenant.id)],
)
# Tenancy modes (configured via adapter):
# - ImplicitTenantAdapter: Single tenant (indie deployments)
# - HeaderTenantAdapter: Multi-tenant via X-Tenant-ID header
# - DatabasePerTenantAdapter: Separate DB per tenant
# Frontend: tenant from JWT or API response
# Backend: automatic RLS filtering when enabled
See Phase 06 checklist for TenantProtocol details.
Part 2: Frontend Migration (Frappe UI → Refine)
Quick Reference
| Frappe UI | Refine + React |
|---|---|
frappe.ui.form.Form | useForm() + RJSF |
frappe.Listview | useTable() + Table component |
frappe.call() | useCustom() or fetch() |
cur_frm | React state / props |
frappe.route_options | URL params / React Router |
frappe.msgprint() | Toast / notification component |
frappe.confirm() | Modal / dialog component |
frappe.ui.Dialog | useModal() |
30. Forms
Frappe
frappe.ui.form.on("Invoice", {
customer: function(frm) {
// Called when customer field changes
frappe.call({
method: "myapp.api.get_customer_address",
args: { customer: frm.doc.customer },
callback: (r) => frm.set_value("address", r.message)
});
},
refresh: function(frm) {
frm.add_custom_button("Send Email", () => { ... });
}
});
Refine + RJSF
import { useForm } from "@refinedev/react-hook-form";
import Form from "@rjsf/core";
export const InvoiceEdit = () => {
const { refineCore: { formLoading, onFinish }, watch, setValue } = useForm();
const customer = watch("customer");
// React to field changes
useEffect(() => {
if (customer) {
fetch(`/api/customer/${customer}/address`)
.then(r => r.json())
.then(data => setValue("address", data.address));
}
}, [customer]);
return (
<Form
schema={schema} // From GET /api/meta/Invoice
formData={formData}
onSubmit={onFinish}
/>
);
};
31. List Views
Frappe
frappe.listview_settings["Invoice"] = {
add_fields: ["status", "grand_total"],
get_indicator: (doc) => [doc.status, doc.status === "Paid" ? "green" : "orange"],
onload: (listview) => {
listview.page.add_action_item("Bulk Approve", () => { ... });
}
};
Refine + TanStack Table
import { useTable } from "@refinedev/react-table";
import { Table, Tag, Button } from "antd"; // or shadcn
export const InvoiceList = () => {
const { tableProps } = useTable({
resource: "Invoice",
columns: [
{ header: "Name", accessorKey: "name" },
{ header: "Status", accessorKey: "status",
cell: ({ value }) => (
<Tag color={value === "Paid" ? "green" : "orange"}>{value}</Tag>
)
},
{ header: "Total", accessorKey: "grand_total" },
],
});
return (
<>
<Button onClick={handleBulkApprove}>Bulk Approve</Button>
<Table {...tableProps} />
</>
);
};
32. RPC Calls
Frappe
frappe.call({
method: "myapp.api.get_stock_balance",
args: { item_code: "ITEM-001" },
callback: (r) => console.log(r.message)
});
Refine
import { useCustom } from "@refinedev/core";
const { data, isLoading } = useCustom({
url: "/api/method/get_stock_balance",
method: "post",
config: { payload: { item_code: "ITEM-001" } },
});
// Or with plain fetch
const response = await fetch("/api/method/get_stock_balance", {
method: "POST",
body: JSON.stringify({ item_code: "ITEM-001" }),
});
33. Dialogs and Modals
Frappe
frappe.prompt([
{ fieldname: "reason", label: "Reason", fieldtype: "Small Text" }
], (values) => {
frappe.call({ method: "cancel_invoice", args: { ...values } });
}, "Cancel Invoice");
Refine + shadcn
import { Dialog, DialogContent, DialogTrigger } from "@/components/ui/dialog";
import { useForm } from "react-hook-form";
export const CancelDialog = ({ invoiceId }) => {
const { register, handleSubmit } = useForm();
const onSubmit = async (data) => {
await fetch(`/api/invoice/${invoiceId}/cancel`, {
method: "POST",
body: JSON.stringify(data),
});
};
return (
<Dialog>
<DialogTrigger>Cancel Invoice</DialogTrigger>
<DialogContent>
<form onSubmit={handleSubmit(onSubmit)}>
<label>Reason</label>
<textarea {...register("reason")} />
<button type="submit">Confirm</button>
</form>
</DialogContent>
</Dialog>
);
};
34. Real-time Updates
Frappe
frappe.realtime.on("invoice_updated", (data) => {
cur_frm.reload_doc();
});
Refine (LiveProvider)
import { useSubscription } from "@refinedev/core";
// Subscribe to resource updates
useSubscription({
channel: "Invoice",
onLiveEvent: (event) => {
if (event.type === "updated") {
queryClient.invalidateQueries(["Invoice", event.payload.id]);
}
},
});
Minimal Patterns: Rapid Development in M
[!WARNING] Use Reasonably: These patterns bypass some standard architectural layers (Services, structured UoW) for speed. Ideal for prototypes, scripts, and simple CRUD. For complex business logic, prefer the full Service/Controller/Repository layers.
Philosophy: M's DI system works with function parameters, not just classes. You don't need service classes for simple cases, use DI directly.
The Spectrum of Complexity
Frappe-Like (Quick) Enterprise (Structured)
│ │
▼ ▼
Minimal DocType ─► RPC Function ─► Simple Service ─► Full Service Layer
(data only) (with DI) (single class) (repos, UoW, events)
Start minimal. Add structure when complexity demands it.
Pattern 1: Minimal DocType (No Controller)
Controllers are optional. For simple data storage, this is all you need:
# feedback.py - entire file
from framework_m import DocType, Field
class Feedback(DocType):
"""User feedback - just storage, no business logic."""
subject: str
message: str
rating: int = Field(ge=1, le=5, description="1-5 stars")
email: str | None = None
That's it. You get:
- ✅ Database table created automatically
- ✅ REST API:
POST /api/v1/Feedback,GET /api/v1/Feedback, etc. - ✅ Validation (Pydantic enforces
ratingis 1-5) - ✅ JSON Schema for frontend forms
Frappe equivalent lines: ~15 (with naming, mandatory setup) M lines: 8
Pattern 2: RPC Function with DI (Frappe-style APIs)
For custom endpoints, use @rpc with injected dependencies directly in function params:
# stock_api.py
from decimal import Decimal
from dependency_injector.wiring import inject, Provide
from framework_m import rpc
from framework_m.core.container import Container
from framework_m.adapters.db import GenericRepository
@rpc("stock.get_balance")
@inject
async def get_stock_balance(
item_code: str,
warehouse: str,
repo: GenericRepository = Provide[Container.stock_ledger_repo],
) -> dict:
"""Get stock balance for item at warehouse."""
entries = await repo.list(
filters=[
("item_code", "=", item_code),
("warehouse", "=", warehouse),
]
)
total = sum(e.qty for e in entries.items)
return {"item_code": item_code, "warehouse": warehouse, "qty": total}
Called as: POST /api/v1/rpc/fn/stock.get_balance
{"item_code": "ITEM-001", "warehouse": "Main"}
No class needed. The @inject decorator handles DI in the function.
Pattern 3: Quick Validation (Controller Light)
Need just validation? Add a minimal controller:
# feedback.py
from framework_m import DocType, Field, BaseController
class Feedback(DocType):
subject: str
message: str
rating: int = Field(ge=1, le=5)
email: str | None = None
class FeedbackController(BaseController[Feedback]):
"""Only validate - no other hooks needed."""
async def validate(self, context=None) -> None:
if len(self.doc.message) < 10:
raise ValueError("Message must be at least 10 characters")
Only implement what you need. Empty hooks are inherited from BaseController.
Pattern 4: Direct Database Access (The "Hack")
For quick scripts or migrations, you can use the session directly:
from sqlalchemy import text
from framework_m import rpc
from dependency_injector.wiring import inject, Provide
from framework_m.core.container import Container
from framework_m.core.unit_of_work import UnitOfWork
@rpc("reports.sales_summary")
@inject
async def sales_summary(
year: int,
uow: UnitOfWork = Provide[Container.unit_of_work],
) -> list[dict]:
"""Raw SQL for complex reporting - this is the 'hack' path."""
async with uow:
result = await uow.session.execute(text("""
SELECT
customer,
SUM(grand_total) as total,
COUNT(*) as count
FROM invoice
WHERE EXTRACT(YEAR FROM posting_date) = :year
AND docstatus = 1
GROUP BY customer
ORDER BY total DESC
LIMIT 10
"""), {"year": year})
return [dict(row._mapping) for row in result]
[!WARNING] Raw SQL bypasses permissions, validation, and audit. Use for read-only reports or one-off scripts. For production business logic, use Repositories.
Pattern 5: Multiple RPCs, One File
Group related functions without classes:
# inventory_api.py
from framework_m import rpc
from dependency_injector.wiring import inject, Provide
@rpc("inventory.transfer")
@inject
async def transfer_stock(
item: str,
from_wh: str,
to_wh: str,
qty: float,
stock_svc = Provide[Container.stock_service],
) -> dict:
result = await stock_svc.transfer(item, from_wh, to_wh, qty)
return {"success": True, "entry_id": str(result.id)}
@rpc("inventory.adjust")
@inject
async def adjust_stock(
item: str,
warehouse: str,
qty: float,
reason: str,
stock_svc = Provide[Container.stock_service],
) -> dict:
result = await stock_svc.adjust(item, warehouse, qty, reason)
return {"success": True, "entry_id": str(result.id)}
@rpc("inventory.get_levels")
@inject
async def get_stock_levels(
warehouse: str | None = None,
repo = Provide[Container.item_repo],
) -> list[dict]:
filters = [("warehouse", "=", warehouse)] if warehouse else []
items = await repo.list(filters=filters, limit=1000)
return [{"item": i.item_code, "qty": i.qty} for i in items.items]
Three endpoints, no classes, full DI support.
Pattern 6: Background Job (Minimal)
# jobs/cleanup.py
from framework_m.adapters.jobs import task
from dependency_injector.wiring import inject, Provide
@task
@inject
async def cleanup_old_logs(
days: int = 30,
repo = Provide[Container.log_repo],
uow = Provide[Container.unit_of_work],
):
"""Delete logs older than N days."""
async with uow:
deleted = await repo.delete_where(
filters=[("created", "<", f"now() - interval '{days} days'")]
)
await uow.commit()
return {"deleted": deleted}
# Enqueue from anywhere:
# await cleanup_old_logs.kiq(days=7)
When to Add Structure
| Situation | Pattern |
|---|---|
| Simple data storage | Minimal DocType (no controller) |
| Custom API endpoint | @rpc function with DI |
| Simple validation | Controller with only validate() |
| Complex business logic | Service class |
| Multi-step transactions | Service + UnitOfWork |
| External integrations | Full Protocol/Adapter |
Comparison: Lines of Code
| Task | Frappe | M (Minimal) | M (Full) |
|---|---|---|---|
| Simple DocType | 15 | 8 | 8 |
| DocType + validation | 20 | 15 | 15 |
| Custom API endpoint | 8 | 12 | 25 |
| Stock balance query | 5 | 10 | 20 |
| Background job | 10 | 10 | 15 |
M is 30-50% more verbose for minimal cases, but the gap shrinks as complexity grows.
Summary: Key Mindset Shifts
| Frappe | Framework M | Why |
|---|---|---|
Global frappe.db | Injected repo | Testability, no hidden state |
| Implicit transactions | Explicit UoW | Clear boundaries, multiple txns |
| Hooks on DocType | Hooks on Controller | Separation of data and logic |
| Request = transaction | You decide | Flexibility for complex flows |
frappe.call() | Typed endpoints | IDE support, validation |
cur_frm (global) | React state/props | Component isolation |
| Jinja templates | React components | Reusability, types |
| Server-rendered forms | Client-side RJSF | Better UX, offline support |
Next Steps
- Start with a simple DocType using
m new:doctype - Implement a Controller with hooks
- Use the Repository pattern for data access
- Add Refine.dev frontend for UI
- Connect forms via RJSF + metadata endpoint
See also:
- Architecture Guide — Full architecture details
- ADR-0005 — Frontend stack decision
- Protocol Reference — Hook implementations