Data & Infrastructure10 min read

Design a Task Queue

Distribute background jobs reliably β€” like Celery and SQS
scope:Real-World Systemdifficulty:Intermediate

Understanding the Problem

Every production system has work that shouldn't block the user β€” sending emails, processing images, generating reports, running ETL pipelines. A task queue decouples producers (who submit work) from consumers (who execute it).

Functional Requirements:

  • Submit a task with a payload and priority level.
  • Workers pick up tasks and execute them asynchronously.
  • Retry failed tasks with configurable backoff.
  • Support task prioritization (high, normal, low).
  • Query task status (pending, processing, completed, failed).

Non-Functional Requirements:

  • At-least-once delivery: Every task must be executed at least once β€” losing tasks is unacceptable.
  • Horizontal scalability: Add more workers as load increases, no code changes needed.
  • Fault tolerance: If a worker crashes mid-task, the task must be retried automatically.
  • Ordering: FIFO within the same priority level (best-effort).
β–Έ The idea: producers β†’ queue β†’ consumers

Estimation

Let's size this system:

  • 10M tasks/day β€” ~115 tasks/second average, ~350/s peak
  • Average task processing time: 5 seconds
  • Workers needed at steady state: 115 Γ— 5 = ~575 concurrent tasks β†’ ~60 workers (each handling ~10 concurrent tasks)
  • Task payload size: ~2 KB average (JSON with params)
  • Daily storage: 10M Γ— 2 KB = ~20 GB/day for task metadata
  • Queue depth at peak: If workers can't keep up for 5 minutes: 350 Γ— 300 = 105K queued tasks

The bottleneck is worker throughput, not storage. Auto-scaling workers based on queue depth is critical.

API Design

Submit a Task

EndpointPOST /api/v1/tasks
Request{"type": "send_email", "payload": {"to": "[email protected]", ...}, "priority": "high", "idempotency_key": "abc-123"}
Response{"task_id": "t-7f3a2", "status": "queued", "created_at": "..."}
Status202 Accepted

Check Task Status

EndpointGET /api/v1/tasks/:id
Response{"task_id": "t-7f3a2", "status": "completed", "result": {...}, "attempts": 1}

Cancel a Task

EndpointDELETE /api/v1/tasks/:id
Response{"task_id": "t-7f3a2", "status": "cancelled"}

202 Accepted is the right status code β€” the request has been accepted for processing, but the processing has not been completed. This signals the asynchronous nature of the system.

β–Έ Task lifecycle: submit β†’ queue β†’ process β†’ complete
Click chart to zoom
Task flow: workers poll the queue, ACK on success, NACK on failure. After repeated failures, tasks move to the dead letter queue for manual inspection.
β–Έ Reliability: retries, dead letter queues, idempotency

Visibility Timeout and Failure Handling

The visibility timeout is the core reliability mechanism. When a worker picks up a task, the task becomes invisible to other workers for a set duration (e.g., 30 seconds). This prevents duplicate processing.

What happens if a worker crashes?

  1. Worker picks task, visibility timeout starts (30s).
  2. Worker crashes at second 10 β€” no ACK sent.
  3. After 30s, the task becomes visible again in the queue.
  4. Another worker picks it up and processes it.

Dead Letter Queue (DLQ): After a configurable number of retries (e.g., 3), the task is moved to a separate DLQ. This prevents "poison messages" β€” tasks that always fail β€” from blocking the queue forever. Engineers can inspect the DLQ and either fix and replay the tasks, or discard them.

Idempotency: Since tasks may be delivered more than once (at-least-once), task handlers must be idempotent. Use an idempotency key β€” before processing, check if this key was already processed. If so, skip or return the cached result. This is critical for tasks like "charge payment" or "send email."

β–Έ Full architecture with priority queues

Simple Task Queue with Redis

import redis
import json
import uuid
import time
from enum import Enum
class Priority(Enum):
HIGH = 0 # Lower number = higher priority
NORMAL = 1
LOW = 2
class TaskQueue:
def __init__(self, redis_url="redis://localhost:6379"):
self.r = redis.from_url(redis_url)
self.QUEUES = {
Priority.HIGH: "tasks:high",
Priority.NORMAL: "tasks:normal",
Priority.LOW: "tasks:low",
}
self.PROCESSING = "tasks:processing" # Sorted set (score = timeout)
self.RESULTS = "tasks:results"
def submit(self, task_type: str, payload: dict,
priority=Priority.NORMAL, idempotency_key=None) -> str:
"""Submit a task to the queue."""
task_id = str(uuid.uuid4())[:8]
task = {
"id": task_id,
"type": task_type,
"payload": payload,
"attempts": 0,
"max_retries": 3,
"idempotency_key": idempotency_key,
"created_at": time.time(),
}
# Check idempotency
if idempotency_key:
existing = self.r.get(f"idemp:{idempotency_key}")
if existing:
return json.loads(existing)["id"]
queue_name = self.QUEUES[priority]
self.r.lpush(queue_name, json.dumps(task))
return task_id
def poll(self, visibility_timeout=30) -> dict | None:
"""Worker polls for a task. Checks high β†’ normal β†’ low."""
for priority in Priority:
queue_name = self.QUEUES[priority]
raw = self.r.rpop(queue_name)
if raw:
task = json.loads(raw)
# Mark as processing with timeout
deadline = time.time() + visibility_timeout
self.r.zadd(self.PROCESSING, {raw: deadline})
return task
return None # No tasks available
def ack(self, task: dict):
"""Worker acknowledges successful completion."""
raw = json.dumps(task)
self.r.zrem(self.PROCESSING, raw)
# Store result and idempotency record
self.r.set(f"result:{task['id']}", json.dumps({
"status": "completed", "completed_at": time.time()
}), ex=86400) # TTL 24h
if task.get("idempotency_key"):
self.r.set(f"idemp:{task['idempotency_key']}",
json.dumps(task), ex=86400)
def nack(self, task: dict):
"""Worker reports failure. Re-enqueue or send to DLQ."""
task["attempts"] += 1
if task["attempts"] >= task["max_retries"]:
# Move to dead letter queue
self.r.lpush("tasks:dlq", json.dumps(task))
print(f"Task {task['id']} moved to DLQ after"
f" {task['attempts']} attempts")
else:
# Re-enqueue with backoff
delay = 2 ** task["attempts"] # Exponential backoff
time.sleep(delay)
self.r.lpush(self.QUEUES[Priority.NORMAL],
json.dumps(task))
# --- Usage ---
q = TaskQueue()
# Producer submits tasks
task_id = q.submit("send_email", {
"subject": "Welcome!"
}, priority=Priority.HIGH, idempotency_key="welcome-user-42")
print(f"Submitted: {task_id}")
# Worker loop
task = q.poll()
if task:
try:
print(f"Processing: {task['type']}")
# ... do the work ...
q.ack(task)
except Exception:
q.nack(task)
Output
Submitted: a1b2c3d4
Processing: send_email
Note: Interview tip: Always discuss failure handling β€” what happens when a worker crashes mid-task? Mention the visibility timeout pattern and idempotency keys. Also clarify the trade-off between exactly-once (very hard, requires distributed transactions) and at-least-once delivery (practical, requires idempotent handlers).

Key Metrics

Enqueue task
LPUSH to Redis or SQS SendMessage
~1-5 ms \(O(1)\)
Dequeue task
RPOP from Redis or SQS ReceiveMessage
~1-5 ms \(O(1)\)
Throughput (per worker)
Depends on avg task duration (5s)
~10-12 tasks/min β€”
Worker scaling
Double workers = double throughput
Linear \(O(n)\)
Failure rate (target)
After retries, before DLQ
< 0.1% β€”
Queue depth at peak
5-min burst at 350 tasks/s
~105K tasks β€”

Quick check

What is a visibility timeout in a task queue?

Continue reading