Skip to main content

Phase 04: Background Jobs & Events

Objective: Implement asynchronous job processing, event-driven architecture, and webhook system with pluggable backends (Redis, NATS, In-Memory).


0. Backend Selection (Cross-Platform)

0.1 Decision

ComponentBackendLibraryNotes
JobsNATS JetStreamtaskiq + taskiq-natsCross-platform, persistent
EventsNATS JetStreamnats-pyPub/Sub + Replay
WS BackplaneNATSnats-pyDistributed WebSockets
CacheRedisredis-pyWindows: Memurai/Garnet

0.2 Dev Mode (No External Deps)

  • Implement InMemoryJobQueue for local dev (asyncio.Queue).
  • Implement InMemoryEventBus for local dev.
  • Auto-detect: If NATS_URL not set, use in-memory.

1. Taskiq + NATS Integration (Primary)

1.1 Taskiq Setup

  • Add dependencies: taskiq, taskiq-nats.

  • Create src/framework_m/adapters/jobs/taskiq_adapter.py

  • Configure Taskiq with NATS JetStream:

    • Set NATS connection URL from environment (NATS_URL)
    • Configure job timeout (default: 300s)
    • Configure max retries (default: 3)
    • Use PullBasedJetStreamBroker for reliable delivery
  • Create broker instance:

    from taskiq_nats import PullBasedJetStreamBroker

    broker = PullBasedJetStreamBroker(
    servers=["nats://localhost:4222"],
    stream="framework_m_jobs",
    )

1.2 Job Queue Protocol Implementation

  • Implement TaskiqJobQueueAdapter:
    • async def enqueue(job_name: str, **kwargs) -> str:
      • Get registered task
      • Call .kiq() to enqueue
      • Return job ID
    • async def schedule(job_name: str, cron: str, **kwargs):
      • Use TaskiqScheduler with cron triggers

1.3 Job Registration

  • Create job decorator @job:

    @broker.task(name="send_email", retry=3)
    async def send_email(to: str, subject: str, body: str):
    # Job logic
    pass
  • Implement job registry:

    • Store all @broker.task decorated functions
    • Auto-discover on startup

2. Worker Process

2.1 Worker Implementation

  • Create src/framework_m/cli/worker.py

  • Implement worker command:

    m worker
  • Worker startup:

    • Initialize database connection
    • Discover and register all jobs
    • Start Taskiq worker
    • Listen for jobs from NATS JetStream

2.2 Job Context

  • Create job context for dependency injection:

    • Provide database session
    • Provide repository factory
    • Provide event bus
    • Pass to job functions
  • Add job metadata:

    • Job ID
    • Enqueued time
    • Started time
    • User context (Must re-hydrate from job arguments if applicable)

3. Scheduler

3.1 Cron Jobs

  • Support Taskiq's native cron syntax:

    cron_jobs = [
    cron("send_daily_report", hour=9, minute=0),
    cron("cleanup_old_files", hour=2, minute=0),
    ]
  • Create ScheduledJob DocType:

    • name: str - Job identifier
    • function: str - Dotted path to function
    • cron_expression: str - Cron syntax
    • enabled: bool - Active/inactive
    • last_run: datetime | None
    • next_run: datetime | None

3.2 Dynamic Scheduler

  • Implement dynamic job scheduling:
    • Load ScheduledJob DocTypes from database
    • Register with Taskiq at startup
    • Support adding/removing jobs without restart (via signal) NOT NEEDED: Worker restart is acceptable for schedule changes; k8s rolling updates handle this gracefully. If needed in future, can use NATS pub/sub to signal workers in Phase 06+ (Ops/Deployment).

4. Event Bus

4.1 NATS Event Bus Implementation

  • Create src/framework_m/adapters/events/nats_event_bus.py
  • Implement NatsEventBusAdapter:
    • async def publish(topic: str, event: BaseModel):
      • Serialize event to JSON
      • Publish to NATS JetStream subject
    • async def subscribe(topic: str, handler: Callable):
      • Subscribe to NATS JetStream subject
      • Deserialize event
      • Call handler function

4.2 Event Types

  • Create base event class:

    class BaseEvent(BaseModel):
    event_type: str
    timestamp: datetime
    user_id: str | None
    data: dict
  • Define standard events:

    • DocCreated
    • DocUpdated
    • DocDeleted
    • DocSubmitted
    • DocCancelled

4.3 Event Publishing from Lifecycle Hooks

  • Update GenericRepository:

    • Inject event bus
    • Publish DocCreated after insert
    • Publish DocUpdated after update
    • Publish DocDeleted after delete
  • Add event publishing to controller hooks:

    async def after_insert(self):
    await self.event_bus.publish(
    "doc.created",
    DocCreated(doctype="Todo", doc_id=self.doc.name)
    )

5. Webhook System

5.1 Webhook DocType

  • Create Webhook DocType:
    • name: str
    • event: str - Event to listen to (e.g., "doc.created")
    • doctype_filter: str | None - Filter by DocType
    • url: str - Webhook endpoint URL
    • method: str - HTTP method (POST, PUT)
    • headers: dict - Custom headers
    • enabled: bool
    • secret: str - For signature verification

5.2 Webhook Listener

  • Create src/framework_m/adapters/webhooks/listener.py
  • Implement webhook listener:
    • Subscribe to all events
    • Load active webhooks from database
    • Filter events by webhook configuration
    • Trigger HTTP call to webhook URL

5.3 Webhook Delivery

  • Implement webhook delivery:

    • Use httpx for async HTTP calls
    • Add signature header (HMAC-SHA256 with secret)
    • Set timeout (default: 30s)
    • Handle errors and retries
  • Add retry logic:

    • Retry on failure (exponential backoff)
    • Max retries: 3
    • Log failed deliveries

5.4 Webhook Logs

  • Create WebhookLog DocType:
    • webhook: str - Reference to Webhook
    • event: str - Event that triggered
    • status: str - Success/Failed
    • response_code: int
    • response_body: str
    • error: str | None
    • timestamp: datetime

6. Job Monitoring

6.1 Job Status Tracking

  • Create JobLog DocType:

    • job_id: str
    • job_name: str
    • status: str - Queued/Running/Success/Failed
    • enqueued_at: datetime
    • started_at: datetime | None
    • completed_at: datetime | None
    • error: str | None
    • result: dict | None
  • Update job execution to log status:

    • Create log entry on enqueue
    • Update on start
    • Update on completion/failure

6.2 Job Management API

  • Create endpoints:
    • GET /api/v1/jobs - List jobs
    • GET /api/v1/jobs/{job_id} - Get job status
    • POST /api/v1/jobs/{job_id}/cancel - Cancel job
    • POST /api/v1/jobs/{job_id}/retry - Retry failed job

6.3 Job Monitor (Admin UI)

  • Option A: Native Desk UI (IMPLEMENTED):
    • Job Log List View (auto-generated via api_resource=True).
    • Shows Status, Error, Duration (duration_seconds property).
  • Option B: Taskiq Dashboard (External) - NOT NEEDED for MVP: Native JobLog UI is sufficient for debugging. External dashboards can be mounted in Phase 06+ (Ops/Deployment) if production monitoring requires deeper worker introspection.
    • Mount taskiq-dashboard (if available) or similar tool.
    • Useful for sysadmins to debug worker health.

4. Real-time Events (WebSockets)

4.1 WebSocket Protocol (The Port)

Rationale: Abstraction allows switching backplanes (Redis/NATS/Memory).

  • Create src/framework_m/core/interfaces/socket.py
  • Define SocketProtocol:
    • async def broadcast(topic: str, message: dict)
    • async def send_to_user(user_id: str, message: dict)

4.2 NATS Backplane (The Adapter)

Target: Kubernetes support (Distributed).

  • Create tests/adapters/socket/test_nats_socket.py
  • Write tests using testcontainers-nats to verify Pub/Sub.
  • Create src/framework_m/adapters/socket/nats_socket.py
  • Implement NatsSocketAdapter:
    • On Boot: Subscribe to framework.events.*.
    • On Event: await nc.publish(subject, json.dumps(msg).encode()).
    • On Receive (Sub): Forward to local connected Websocket clients (Connection Manager).

4.3 WebSocket Endpoint (Litestar)

  • Create src/framework_m/adapters/web/socket.py
  • Implement ConnectionManager:
    • Store active connections Dict[UserId, List[WebSocket]].
  • Add route WS /api/v1/stream:
    • Authenticate user (Query param token).
    • Upgrade connection.
    • Register with ConnectionManager.
    • Listen for client disconnect.

7. Testing

7.1 Unit Tests

  • Test job registration:

    • Test @job decorator
    • Test job registry
  • Test event bus:

    • Test publish/subscribe
    • Test event serialization

7.2 Integration Tests

  • Test job execution:

    • Enqueue job
    • Verify job runs
    • Check job result
  • Test scheduled jobs:

    • Register cron job
    • Verify execution at scheduled time (use time mocking)
  • Test webhooks:

    • Create webhook
    • Trigger event
    • Verify HTTP call made
    • Check webhook log
  • Use testcontainers for NATS


8. CLI Commands

  • Add job management commands:
    • m worker - Start worker process
    • m job:list - List registered jobs
    • m job:run <name> - Run job immediately
    • m job:status <id> - Check job status

Validation Checklist

Before moving to Phase 05, verify:

  • Jobs can be enqueued and executed
  • Scheduled jobs run at correct times
  • Events are published and received
  • Webhooks are triggered on events
  • Job logs are created and updated
  • Worker process starts and processes jobs

Anti-Patterns to Avoid

Don't: Run long-running tasks in HTTP handlers ✅ Do: Enqueue jobs for async processing

Don't: Use polling for event detection ✅ Do: Use NATS Pub/Sub for real-time events

Don't: Hardcode webhook URLs ✅ Do: Store webhooks in database for dynamic configuration

Don't: Ignore failed jobs ✅ Do: Implement retry logic and logging