Skip to main content

How to Manage Background Workers and Jobs

Framework M supports two paradigms for background execution: standard Background Jobs (Taskiq/NATS) for simple asynchronous tasks, and Durable Execution (Temporal) for long-running workflows.

This guide explains how to define background jobs, register them for automatic discovery, and run the worker processes.

Defining Background Jobs

To define a background task, use the @job decorator. This decorator is engine-agnostic; it registers the task in a central registry that can be used by Taskiq, Temporal, or even in-memory backends during development.

from framework_m import job

@job(name="send_welcome_email", retry=3, timeout=300)
async def send_welcome_email(user_email: str):
"""A background task to send emails."""
# ... logic to send email ...
print(f"Sent welcome email to {user_email}")

Key Parameters:

  • name: A unique identifier for the job (required).
  • pool: Logical group for worker isolation (default: "default").
  • retry: Number of times to retry the job on failure (default: 3).
  • timeout: Maximum execution time in seconds (default: 300).

Job Discovery & Registration

Framework M uses an Entry-Point based discovery system. This ensures that workers and CLI commands can find your jobs without requiring manual imports or complex configuration.

1. Registering Job Modules

In your application's pyproject.toml, register the module containing your @job decorated functions under the framework_m.jobs entry point group:

[project.entry-points."framework_m.jobs"]
my_app_jobs = "my_app.jobs"

2. Verifying Discovery

You can verify that your jobs are correctly discovered by the framework using the CLI:

uv run m jobs list

Starting a Worker

Framework M uses a unified worker CLI. You can start different types of workers depending on your infrastructure.

1. Standard Job Worker (Taskiq)

By default, the worker command starts a Taskiq worker connected to your NATS JetStream broker.

# Start the default background worker
uv run m worker

# Start with custom concurrency
uv run m worker --concurrency 8

2. Worker Pools & Resource Isolation

You can isolate jobs into specific "pools" for specialized scaling or priority processing.

Defining Pool Affinity

Assign a job to a specific pool using the pool parameter:

@job(name="process_video", pool="heavy")
async def process_video(file_id: str):
# ... expensive logic ...
pass

Running Specialized Workers

Start a worker that only listens to specific pools:

# Process only 'heavy' jobs
uv run m worker --pool heavy

# Process multiple specific pools
uv run m worker --pool heavy --pool notifications

By default, a worker with no --pool flag listens to all jobs (the "Monolith" mode).

3. Durable Workflow Worker (Temporal)

If your application uses Temporal for durable workflows, start the worker with the --worker-type argument:

uv run m worker --worker-type=temporal

Securing the Temporal Worker

For production deployments, provide security credentials via environment variables:

export TEMPORAL_URL="your-namespace.tmprl.cloud:7233"
export TEMPORAL_NAMESPACE="your-namespace.accountId"
export TEMPORAL_CLIENT_CERT="/path/to/client.pem"
export TEMPORAL_CLIENT_KEY="/path/to/client.key"

uv run m worker --worker-type=temporal

Advanced: Durable Activities

For complex workflows involving Temporal, you may use @worker_activity for specialized activities that require fine-grained Temporal control, though @job is preferred for standard background tasks.


[!TIP] For a deeper understanding of how workers manage application initialization and discovery, see the CLI & Worker Lifecycle Architecture.