JobQueueProtocol
Protocol defining the contract for job queue implementations.
This is the primary port for background task processing in the
hexagonal architecture.
Implementations include:
- ArqJobQueue: Redis-backed queue using Arq
- CeleryJobQueue: Celery distributed task queue
Example usage:
queue: JobQueueProtocol = container.get(JobQueueProtocol)
# Enqueue a job
job_id = await queue.enqueue("send_email", to="user@example.com")
# Check status
job = await queue.get_status(job_id)
if job.is_complete:
print(f"Job finished: {job.result}")
# Schedule recurring job
await queue.schedule("cleanup_temp", cron="0 0 * * *")
Source: job_queue.py
Methods
enqueue
async def enqueue(self, job_name: str, **kwargs: Any) -> str
Add a job to the queue for immediate processing.
Args:
job_name: Name of the registered job function
**kwargs: Arguments to pass to the job function
Returns:
Unique job ID for tracking
schedule
async def schedule(self, job_name: str, cron: str, **kwargs: Any) -> str
Schedule a recurring job using cron syntax.
Args:
job_name: Name of the registered job function
cron: Cron expression (e.g., "0 0 * * *" for daily at midnight)
**kwargs: Arguments to pass to the job function
Returns:
Schedule ID for tracking/cancellation
cancel
async def cancel(self, job_id: str) -> bool
Cancel a pending or running job.
Args:
job_id: ID of the job to cancel
Returns:
True if job was cancelled, False if already completed
get_status
async def get_status(self, job_id: str) -> JobInfo | None
Get current status and info for a job.
Args:
job_id: ID of the job to check
Returns:
JobInfo if job exists, None otherwise
retry
async def retry(self, job_id: str) -> str
Retry a failed job.
Creates a new job with the same parameters.
Args:
job_id: ID of the failed job to retry
Returns:
New job ID for the retry
Raises:
ValueError: If original job not found or wasn't failed