Writing Handlers¶
Handlers are Python functions that process jobs. This guide covers handler registration, patterns, and best practices.
Basic Handler¶
Register a handler using the @register_handler decorator:
from taskmq.jobs.handlers import register_handler
@register_handler("send_email")
def send_email(job):
"""Send an email based on job payload.
Args:
job: Job object with id, payload, status, and other attributes.
Returns:
A result dict that will be stored with the job.
"""
payload = job.payload
to = payload.get("to")
subject = payload.get("subject")
body = payload.get("body")
# Your email sending logic here
print(f"Sending email to {to}: {subject}")
return {"status": "sent", "recipient": to}
Job Object¶
The job parameter provides access to job data:
| Attribute | Type | Description |
|---|---|---|
job.id |
int | Unique job identifier |
job.payload |
dict/str | Job data (parsed JSON or string) |
job.status |
JobStatus | Current status (PENDING, RUNNING, etc.) |
job.handler |
str | Handler name |
job.priority |
int | Priority level (0, 10, 20) |
job.retries |
int | Number of retry attempts |
job.created_at |
datetime | When the job was created |
job.result |
str | Result from previous execution (if any) |
job.error_log |
str | Error message (if failed) |
Async Handlers¶
TaskMQ supports async handlers natively:
import asyncio
import httpx
from taskmq.jobs.handlers import register_handler
@register_handler("fetch_data")
async def fetch_data(job):
"""Fetch data from an external API."""
url = job.payload.get("url")
async with httpx.AsyncClient() as client:
response = await client.get(url)
data = response.json()
return {"fetched": True, "data_size": len(data)}
Handler Return Values¶
Handlers can return various types:
@register_handler("example")
def example_handler(job):
# Return a dict (recommended)
return {"status": "success", "count": 42}
# Return a string
return "Task completed"
# Return None (no result stored)
return None
The return value is converted to a string and stored in job.result.
Error Handling¶
Unhandled exceptions trigger the retry policy:
@register_handler("risky_task")
def risky_task(job):
"""Task that might fail."""
if some_condition:
raise ValueError("Something went wrong")
return {"status": "ok"}
For controlled failures without retry:
@register_handler("validate")
def validate(job):
"""Validate data, fail fast on invalid input."""
data = job.payload
if not data.get("required_field"):
# Log the error but don't raise (won't trigger retry)
return {"status": "failed", "reason": "missing required_field"}
return {"status": "valid"}
Handler Patterns¶
Processing with External Services¶
import httpx
from taskmq.jobs.handlers import register_handler
@register_handler("webhook")
def send_webhook(job):
"""Send a webhook notification."""
url = job.payload.get("url")
data = job.payload.get("data")
response = httpx.post(url, json=data, timeout=30)
response.raise_for_status()
return {
"status_code": response.status_code,
"delivered": True
}
Batch Processing¶
@register_handler("batch_process")
def batch_process(job):
"""Process multiple items in a single job."""
items = job.payload.get("items", [])
results = []
for item in items:
result = process_item(item)
results.append(result)
return {
"processed": len(results),
"results": results
}
Chaining Jobs¶
from taskmq.storage import get_backend
@register_handler("step1")
def step1(job):
"""First step, creates next job."""
result = do_step1(job.payload)
# Queue the next step
backend = get_backend()
backend.insert_job(
payload=result,
handler="step2"
)
return {"step": 1, "next_queued": True}
@register_handler("step2")
def step2(job):
"""Second step."""
return {"step": 2, "complete": True}
Best Practices¶
-
Keep handlers focused - One handler, one responsibility
-
Make handlers idempotent - Safe to run multiple times with same input
-
Handle timeouts - Set appropriate timeouts for external calls
-
Return meaningful results - Include status and relevant data
-
Use structured payloads - Prefer dicts over strings for payload
-
Log important events - Use Python logging for debugging
import logging
from taskmq.jobs.handlers import register_handler
logger = logging.getLogger(__name__)
@register_handler("important_task")
def important_task(job):
logger.info(f"Starting job {job.id}")
try:
result = do_work(job.payload)
logger.info(f"Job {job.id} completed successfully")
return result
except Exception as e:
logger.error(f"Job {job.id} failed: {e}")
raise
Handler Discovery¶
Handlers must be imported before workers start. Common patterns:
Option 1: Import in worker script
# worker.py
import myapp.handlers # Registers all handlers
from taskmq.worker import Worker
Worker().start()
Option 2: Package init.py
# myapp/handlers/__init__.py
from .email import send_email
from .sms import send_sms
from .webhooks import send_webhook
Option 3: Entry point registration (advanced)
Define handlers as package entry points for automatic discovery.
Testing Handlers¶
import pytest
from taskmq.storage.base import Job, JobStatus
from myapp.handlers import send_email
def test_send_email():
"""Test email handler."""
job = Job(
id=1,
payload={"to": "test@example.com", "subject": "Test"},
status=JobStatus.RUNNING
)
result = send_email(job)
assert result["status"] == "sent"
assert result["recipient"] == "test@example.com"
Next Steps¶
- API Reference - REST API documentation
- Usage Guide - CLI and configuration options