Phase 04: Background Jobs & Events
Objective: Implement asynchronous job processing, event-driven architecture, and webhook system with pluggable backends (Redis, NATS, In-Memory).
0. Backend Selection (Cross-Platform)
0.1 Decision
| Component | Backend | Library | Notes |
|---|---|---|---|
| Jobs | NATS JetStream | taskiq + taskiq-nats | Cross-platform, persistent |
| Events | NATS JetStream | nats-py | Pub/Sub + Replay |
| WS Backplane | NATS | nats-py | Distributed WebSockets |
| Cache | Redis | redis-py | Windows: Memurai/Garnet |
0.2 Dev Mode (No External Deps)
- Implement
InMemoryJobQueuefor local dev (asyncio.Queue). - Implement
InMemoryEventBusfor local dev. - Auto-detect: If
NATS_URLnot set, use in-memory.
1. Taskiq + NATS Integration (Primary)
1.1 Taskiq Setup
-
Add dependencies:
taskiq,taskiq-nats. -
Create
src/framework_m/adapters/jobs/taskiq_adapter.py -
Configure Taskiq with NATS JetStream:
- Set NATS connection URL from environment (
NATS_URL) - Configure job timeout (default: 300s)
- Configure max retries (default: 3)
- Use
PullBasedJetStreamBrokerfor reliable delivery
- Set NATS connection URL from environment (
-
Create broker instance:
from taskiq_nats import PullBasedJetStreamBroker
broker = PullBasedJetStreamBroker(
servers=["nats://localhost:4222"],
stream="framework_m_jobs",
)
1.2 Job Queue Protocol Implementation
- Implement
TaskiqJobQueueAdapter:-
async def enqueue(job_name: str, **kwargs) -> str:- Get registered task
- Call
.kiq()to enqueue - Return job ID
-
async def schedule(job_name: str, cron: str, **kwargs):- Use
TaskiqSchedulerwith cron triggers
- Use
-
1.3 Job Registration
-
Create job decorator
@job:@broker.task(name="send_email", retry=3)
async def send_email(to: str, subject: str, body: str):
# Job logic
pass -
Implement job registry:
- Store all
@broker.taskdecorated functions - Auto-discover on startup
- Store all
2. Worker Process
2.1 Worker Implementation
-
Create
src/framework_m/cli/worker.py -
Implement worker command:
m worker -
Worker startup:
- Initialize database connection
- Discover and register all jobs
- Start Taskiq worker
- Listen for jobs from NATS JetStream
2.2 Job Context
-
Create job context for dependency injection:
- Provide database session
- Provide repository factory
- Provide event bus
- Pass to job functions
-
Add job metadata:
- Job ID
- Enqueued time
- Started time
- User context (Must re-hydrate from job arguments if applicable)
3. Scheduler
3.1 Cron Jobs
-
Support Taskiq's native cron syntax:
cron_jobs = [
cron("send_daily_report", hour=9, minute=0),
cron("cleanup_old_files", hour=2, minute=0),
] -
Create
ScheduledJobDocType:-
name: str- Job identifier -
function: str- Dotted path to function -
cron_expression: str- Cron syntax -
enabled: bool- Active/inactive -
last_run: datetime | None -
next_run: datetime | None
-
3.2 Dynamic Scheduler
- Implement dynamic job scheduling:
- Load
ScheduledJobDocTypes from database - Register with Taskiq at startup
- Support adding/removing jobs without restart (via signal) NOT NEEDED: Worker restart is acceptable for schedule changes; k8s rolling updates handle this gracefully. If needed in future, can use NATS pub/sub to signal workers in Phase 06+ (Ops/Deployment).
- Load
4. Event Bus
4.1 NATS Event Bus Implementation
- Create
src/framework_m/adapters/events/nats_event_bus.py - Implement
NatsEventBusAdapter:-
async def publish(topic: str, event: BaseModel):- Serialize event to JSON
- Publish to NATS JetStream subject
-
async def subscribe(topic: str, handler: Callable):- Subscribe to NATS JetStream subject
- Deserialize event
- Call handler function
-
4.2 Event Types
-
Create base event class:
class BaseEvent(BaseModel):
event_type: str
timestamp: datetime
user_id: str | None
data: dict -
Define standard events:
-
DocCreated -
DocUpdated -
DocDeleted -
DocSubmitted -
DocCancelled
-
4.3 Event Publishing from Lifecycle Hooks
-
Update
GenericRepository:- Inject event bus
- Publish
DocCreatedafter insert - Publish
DocUpdatedafter update - Publish
DocDeletedafter delete
-
Add event publishing to controller hooks:
async def after_insert(self):
await self.event_bus.publish(
"doc.created",
DocCreated(doctype="Todo", doc_id=self.doc.name)
)
5. Webhook System
5.1 Webhook DocType
- Create
WebhookDocType:-
name: str -
event: str- Event to listen to (e.g., "doc.created") -
doctype_filter: str | None- Filter by DocType -
url: str- Webhook endpoint URL -
method: str- HTTP method (POST, PUT) -
headers: dict- Custom headers -
enabled: bool -
secret: str- For signature verification
-
5.2 Webhook Listener
- Create
src/framework_m/adapters/webhooks/listener.py - Implement webhook listener:
- Subscribe to all events
- Load active webhooks from database
- Filter events by webhook configuration
- Trigger HTTP call to webhook URL
5.3 Webhook Delivery
-
Implement webhook delivery:
- Use
httpxfor async HTTP calls - Add signature header (HMAC-SHA256 with secret)
- Set timeout (default: 30s)
- Handle errors and retries
- Use
-
Add retry logic:
- Retry on failure (exponential backoff)
- Max retries: 3
- Log failed deliveries
5.4 Webhook Logs
- Create
WebhookLogDocType:-
webhook: str- Reference to Webhook -
event: str- Event that triggered -
status: str- Success/Failed -
response_code: int -
response_body: str -
error: str | None -
timestamp: datetime
-
6. Job Monitoring
6.1 Job Status Tracking
-
Create
JobLogDocType:-
job_id: str -
job_name: str -
status: str- Queued/Running/Success/Failed -
enqueued_at: datetime -
started_at: datetime | None -
completed_at: datetime | None -
error: str | None -
result: dict | None
-
-
Update job execution to log status:
- Create log entry on enqueue
- Update on start
- Update on completion/failure
6.2 Job Management API
- Create endpoints:
-
GET /api/v1/jobs- List jobs -
GET /api/v1/jobs/{job_id}- Get job status -
POST /api/v1/jobs/{job_id}/cancel- Cancel job -
POST /api/v1/jobs/{job_id}/retry- Retry failed job
-
6.3 Job Monitor (Admin UI)
- Option A: Native Desk UI (IMPLEMENTED):
-
Job LogList View (auto-generated viaapi_resource=True). - Shows Status, Error, Duration (
duration_secondsproperty).
-
-
Option B: Taskiq Dashboard (External)- NOT NEEDED for MVP: Native JobLog UI is sufficient for debugging. External dashboards can be mounted in Phase 06+ (Ops/Deployment) if production monitoring requires deeper worker introspection.- Mount
taskiq-dashboard(if available) or similar tool. - Useful for sysadmins to debug worker health.
- Mount
4. Real-time Events (WebSockets)
4.1 WebSocket Protocol (The Port)
Rationale: Abstraction allows switching backplanes (Redis/NATS/Memory).
- Create
src/framework_m/core/interfaces/socket.py - Define
SocketProtocol:-
async def broadcast(topic: str, message: dict) -
async def send_to_user(user_id: str, message: dict)
-
4.2 NATS Backplane (The Adapter)
Target: Kubernetes support (Distributed).
- Create
tests/adapters/socket/test_nats_socket.py - Write tests using testcontainers-nats to verify Pub/Sub.
- Create
src/framework_m/adapters/socket/nats_socket.py - Implement
NatsSocketAdapter:- On Boot: Subscribe to
framework.events.*. - On Event:
await nc.publish(subject, json.dumps(msg).encode()). - On Receive (Sub): Forward to local connected Websocket clients (Connection Manager).
- On Boot: Subscribe to
4.3 WebSocket Endpoint (Litestar)
- Create
src/framework_m/adapters/web/socket.py - Implement
ConnectionManager:- Store active connections
Dict[UserId, List[WebSocket]].
- Store active connections
- Add route
WS /api/v1/stream:- Authenticate user (Query param token).
- Upgrade connection.
- Register with
ConnectionManager. - Listen for client disconnect.
7. Testing
7.1 Unit Tests
-
Test job registration:
- Test
@jobdecorator - Test job registry
- Test
-
Test event bus:
- Test publish/subscribe
- Test event serialization
7.2 Integration Tests
-
Test job execution:
- Enqueue job
- Verify job runs
- Check job result
-
Test scheduled jobs:
- Register cron job
- Verify execution at scheduled time (use time mocking)
-
Test webhooks:
- Create webhook
- Trigger event
- Verify HTTP call made
- Check webhook log
-
Use testcontainers for NATS
8. CLI Commands
- Add job management commands:
-
m worker- Start worker process -
m job:list- List registered jobs -
m job:run <name>- Run job immediately -
m job:status <id>- Check job status
-
Validation Checklist
Before moving to Phase 05, verify:
- Jobs can be enqueued and executed
- Scheduled jobs run at correct times
- Events are published and received
- Webhooks are triggered on events
- Job logs are created and updated
- Worker process starts and processes jobs
Anti-Patterns to Avoid
❌ Don't: Run long-running tasks in HTTP handlers ✅ Do: Enqueue jobs for async processing
❌ Don't: Use polling for event detection ✅ Do: Use NATS Pub/Sub for real-time events
❌ Don't: Hardcode webhook URLs ✅ Do: Store webhooks in database for dynamic configuration
❌ Don't: Ignore failed jobs ✅ Do: Implement retry logic and logging