Skip to main content

JobQueueProtocol

Protocol defining the contract for job queue implementations.

This is the primary port for background task processing in the
hexagonal architecture.

Implementations include:
- ArqJobQueue: Redis-backed queue using Arq
- CeleryJobQueue: Celery distributed task queue

Example usage:
queue: JobQueueProtocol = container.get(JobQueueProtocol)

# Enqueue a job
job_id = await queue.enqueue("send_email", to="user@example.com")

# Check status
job = await queue.get_status(job_id)
if job.is_complete:
print(f"Job finished: {job.result}")

# Schedule recurring job
await queue.schedule("cleanup_temp", cron="0 0 * * *")

Source: job_queue.py

Methods

enqueue

async def enqueue(self, job_name: str, **kwargs: Any) -> str

Add a job to the queue for immediate processing.

    Args:
job_name: Name of the registered job function
**kwargs: Arguments to pass to the job function

Returns:
Unique job ID for tracking

schedule

async def schedule(self, job_name: str, cron: str, **kwargs: Any) -> str

Schedule a recurring job using cron syntax.

    Args:
job_name: Name of the registered job function
cron: Cron expression (e.g., "0 0 * * *" for daily at midnight)
**kwargs: Arguments to pass to the job function

Returns:
Schedule ID for tracking/cancellation

cancel

async def cancel(self, job_id: str) -> bool

Cancel a pending or running job.

    Args:
job_id: ID of the job to cancel

Returns:
True if job was cancelled, False if already completed

get_status

async def get_status(self, job_id: str) -> JobInfo | None

Get current status and info for a job.

    Args:
job_id: ID of the job to check

Returns:
JobInfo if job exists, None otherwise

retry

async def retry(self, job_id: str) -> str

Retry a failed job.

    Creates a new job with the same parameters.

Args:
job_id: ID of the failed job to retry

Returns:
New job ID for the retry

Raises:
ValueError: If original job not found or wasn't failed