Data & Infrastructure১০ মিনিট পাঠ

টাস্ক কিউ ডিজাইন

নির্ভরযোগ্যভাবে ব্যাকগ্রাউন্ড জব বা কাজ ডিস্ট্রিবিউট করা — যেমন, সেলারি (Celery) ও এসকিউএস (SQS)
scope:রিয়েল-ওয়ার্ল্ড সিস্টেম (Real-World System)difficulty:ইন্টারমিডিয়েট (Intermediate)

সমস্যাটি বোঝা (Understanding the Problem)

প্রতিটি প্রোডাকশন সিস্টেমেই এমন কিছু কাজ থাকে যেগুলো ব্যবহারকারীকে আটকে রাখতে পারে না — যেমন ইমেইল পাঠানো, ইমেজ প্রসেস করা, রিপোর্ট জেনারেট করা বা ইটিএল (ETL) পাইপলাইন রান করা। একটি টাস্ক কিউ মূলত প্রোডিউসার (যারা কাজ সাবমিট করে) এবং কনজিউমার (যারা সেটা এক্সিকিউট বা বাস্তবায়ন করে)-কে ডিকাপল (decouple) বা আলাদা করে।

ফাংশনাল প্রয়োজনীয়তা (Functional Requirements):

  • একটি পেলোড (payload) বা ডেটা এবং প্রায়োরিটি বা অগ্রাধিকারের লেভেল দিয়ে একটি টাস্ক সাবমিট করা।
  • ওয়ার্কাররা (Workers) টাস্কগুলো তুলে নেবেন এবং সেগুলো অ্যাসিনক্রোনাসলি (asynchronously) বা ব্যাকগ্রাউন্ডে এক্সিকিউট করবেন।
  • ফেইল হওয়া বা আটকে যাওয়া টাস্কগুলোকে কনফিগারেবল ব্যাকঅফ (configurable backoff) দিয়ে রিট্রাই (retry) বা আবার চেষ্টা করা।
  • টাস্ক প্রায়োরিটি বা অগ্রাধিকার (উচ্চ, সাধারণ, নিম্ন) সমর্থন করা।
  • টাস্কের স্ট্যাটাস কোয়েরি করা (পেন্ডিং, প্রসেসিং, কমপ্লিট, ফেইলড ইত্যাদি জানা)।

নন-ফাংশনাল প্রয়োজনীয়তা (Non-Functional Requirements):

  • অ্যাট-লিস্ট-ওয়ান্স ডেলিভারি (At-least-once delivery): প্রতিটি কাজ অন্তত একবার অবশ্যই এক্সিকিউট করতে হবে — কোনো কাজ হারিয়ে যাওয়া অগ্রহণযোগ্য।
  • হরিজন্টাল স্কেলাবিলিটি (Horizontal scalability): লোড বাড়লে কোনো ধরনের কোড পরিবর্তন ছাড়াই আরও ওয়ার্কার যুক্ত করার সুবিধা থাকতে হবে।
  • ফল্ট টলারেন্স (Fault tolerance): যদি কোনো ওয়ার্কার টাস্ক চলাকালীন ক্র্যাশ করে, তবে টাস্কটি স্বয়ংক্রিয়ভাবে আবার বা রিট্রাই (Retry) করতে হবে।
  • অর্ডারিং বা ক্রমানুসারে (Ordering): একই প্রায়োরিটি লেভেলে থাকলে প্রথমে আসা কাজগুলো আগে (FIFO - First-In-First-Out) চেষ্টা করা হবে (যতটা সম্ভব বা Best-effort)।
মূল ধারণা: প্রোডিউসার → কিউ → কনজিউমার

অ্যাস্টিমেশন (Estimation)

চলুন এই সিস্টেমের সাইজ পরিমাপ করি:

  • প্রতিদিন ১০ মিলিয়ন টাস্ক — যা গড়ে প্রায় ১১৫ টাস্ক/সেকেন্ড এবং পিক (peak)-এ ৩৫০/সেকেন্ড
  • গড় টাস্ক বা প্রসেসিংয়ের সময়: ৫ সেকেন্ড
  • সাধারণ লোডে প্রয়োজনীয় ওয়ার্কার: ১১৫ × ৫ = ~৫৭৫ কনকারেন্ট টাস্ক (Concurrent tasks) → ~৬০ জন ওয়ার্কার (প্রতিটিতে প্রায় ১০টি কনকারেন্ট টাস্ক হ্যান্ডেল করার ক্ষমতা থাকবে)
  • টাস্কের পেলোডের (payload) আকার: গড়ে ~২ KB (প্যারামিটারসহ জেসন/JSON)
  • প্রতিদিন স্টোরেজ: ১০ মিলিয়ন × ২ KB = ~২০ GB/দিন টাস্ক মেটাডেটার (metadata) জন্য
  • পিক-এ কিউয়ের ডেপথ বা গভীরতা (Queue depth at peak): যদি ৫ মিনিটের জন্য ওয়ার্কাররা লোড মেলাতে না পারেন: তবে ৩৫০ × ৩00 = ১০৫ হাজার টাস্ক কিউ বা সারিতে জমা হতে পারে

এখানে মূল প্রতিবন্ধকতা (Bottleneck) হলো ওয়ার্কারের বা সিস্টেমের থ্রুপুট, স্টোরেজ নয়। কিউ ডেপথের ওপর ভিত্তি করে ওয়ার্কারদের অটো-স্কেল (Auto-scaling) করা খুবই গুরুত্বপূর্ণ।

এপিআই ডিজাইন (API Design)

একটি টাস্ক সাবমিট করা (Submit a Task)

EndpointPOST /api/v1/tasks
Request{"type": "send_email", "payload": {"to": "[email protected]", ...}, "priority": "high", "idempotency_key": "abc-123"}
Response{"task_id": "t-7f3a2", "status": "queued", "created_at": "..."}
Status202 Accepted

টাস্কের স্ট্যাটাস চেক করা (Check Task Status)

EndpointGET /api/v1/tasks/:id
Response{"task_id": "t-7f3a2", "status": "completed", "result": {...}, "attempts": 1}

একটি টাস্ক ক্যানসেল করা (Cancel a Task)

EndpointDELETE /api/v1/tasks/:id
Response{"task_id": "t-7f3a2", "status": "cancelled"}

202 Accepted হলো এই কাজের জন্য সঠিক স্ট্যাটাস কোড — যার মানে হলো রিকোয়েস্ট বা আবেদনটি প্রসেস করার জন্য গ্রহণ করা হয়েছে, তবে প্রসেসিং এখনও সম্পূর্ণ হয়নি। এটি সিস্টেমটির অ্যাসিনক্রোনাস (asynchronous) প্রকৃতি বা দিকটিকে বোঝায়।

টাস্ক লাইফসাইকেল: সাবমিট → কিউ → প্রসেস → কমপ্লিট
Click chart to zoom
টাস্ক ফ্লো: ওয়ার্কাররা কিউ বা সারিতে খোঁজ (poll) নেন, সফল হলে ACK করেন এবং ব্যর্থ হলে NACK করেন। বারবার ফেইল করার পর, টাস্কগুলো ম্যানুয়ালি পরিদর্শনের জন্য ডেড লেটার কিউতে (dead letter queue) সরানো হয়।
রিলায়েবিলিটি বা নির্ভরতা: রিট্রাই (retries), ডেড লেটার কিউ, আইড্যাম্পোটেন্সি (idempotency)

ভিজিবিলিটি টাইমআউট এবং ফেইলিউর হ্যান্ডলিং (Visibility Timeout and Failure Handling)

ভিজিবিলিটি টাইমআউট (visibility timeout) হলো রিলায়েবিলিটির মূল কৌশল বা মেকানিজম। যখন কোনো ওয়ার্কার একটি টাস্ক বা কাজ নেয়, তখন টাস্কটি অন্য ওয়ার্কারদের জন্য একটি নির্দিষ্ট সময়ের (যেমন, ৩০ সেকেন্ড) জন্য অদৃশ্য বা হিডেন (invisible) হয়ে যায়। এটি একই কাজের ডুপ্লিকেট বা দ্বিগুণ পরিমাণ প্রসেসিং রোধ করে।

যদি কোনো ওয়ার্কার ক্র্যাশ করে তবে কী হবে?

  1. ওয়ার্কার টাস্ক তুলে নেবে এবং এর ভিজিবিলিটি টাইমআউট শুরু হবে (৩০ সে.)।
  2. ধরা যাক, ওয়ার্কারটি ১০ম সেকেন্ডে ক্র্যাশ করল — ফলে কোনো ACK পাঠানো হবে না।
  3. এরপর ঠিক ৩০ সেকেন্ড পরে টাস্কটি কিউ বা সারিতে আবারও দৃশ্যমান হবে।
  4. অন্য একটি ওয়ার্কার এরপর সেটি তুলে নেবে এবং প্রসেস করবে।

ডেড লেটার কিউ (Dead Letter Queue - DLQ): নির্দিষ্ট কয়েকবার রিট্রাই করার পর (যেমন, ৩ বার), টাস্কটি একটি আলাদা বা পৃথক কিউ-তে (DLQ-তে) সরানো হয়। এটি "পয়জন মেসেজগুলো (poison messages)" — অর্থাৎ, যে টাস্কগুলো সর্বদাই ফেইল করে বা ব্যর্থ হয় — সেগুলোকে চিরকাল কিউ-তে আটকে থাকতে বাধা দেয়। ইঞ্জিনিয়াররা পরবর্তীতে এই DLQ-এর কাজগুলো পরিদর্শন করতে পারেন এবং এগুলোকে ফিক্স করে পুনরায় রান বা রিপ্লে (Replay) করতে পারেন, অথবা চাইলে এভয়েড বা ইগনরও করতে পারেন।

আইড্যাম্পোটেন্সি (Idempotency): যেহেতু টাস্কগুলো একাধিকবার ডেলিভার হওয়ার একটি চান্স থাকে (অ্যাট-লিস্ট-ওয়ান্স অনুযায়ী), তাই টাস্ক হ্যান্ডলারদের অবশ্যই আইড্যাম্পোটেন্ট (idempotent) হতে হবে। এর জন্য একটি আইড্যাম্পোটেন্সি কী (Idempotency Key) ব্যবহার করার প্রচলন আছে — অর্থাৎ প্রসেস করার আগে, এই কী (Key) দিয়ে এর আগে কোনো কাজ প্রসেস করা হয়েছে কিনা তা চেক করে দেখতে হয়। যদি হয়ে থাকে, তবে কাজটিকে স্কিপ করে দিতে হবে অথবা ক্যাশ থেকে ওই আগের রেজাল্টটি দেখিয়ে দিতে হয়। "পেমেন্ট চার্জ করা" বা "ইমেইল পাঠানোর" মতো কাজগুলোর জন্য এটি অত্যন্ত গুরুত্বপূর্ণ।

প্রায়োরিটি কিউ (Priority Queues)-সহ পূর্ণাঙ্গ আর্কিটেকচার

Redis-এর সাথে সহজ টাস্ক কিউ

import redis
import json
import uuid
import time
from enum import Enum
class Priority(Enum):
HIGH = 0 # কম নাম্বার = উচ্চ প্রায়োরিটি
NORMAL = 1
LOW = 2
class TaskQueue:
def __init__(self, redis_url="redis://localhost:6379"):
self.r = redis.from_url(redis_url)
self.QUEUES = {
Priority.HIGH: "tasks:high",
Priority.NORMAL: "tasks:normal",
Priority.LOW: "tasks:low",
}
self.PROCESSING = "tasks:processing" # সর্টেড সেট (স্কোর = টাইমআউট)
self.RESULTS = "tasks:results"
def submit(self, task_type: str, payload: dict,
priority=Priority.NORMAL, idempotency_key=None) -> str:
"""কিউ-তে একটি টাস্ক সাবমিট করা।"""
task_id = str(uuid.uuid4())[:8]
task = {
"id": task_id,
"type": task_type,
"payload": payload,
"attempts": 0,
"max_retries": 3,
"idempotency_key": idempotency_key,
"created_at": time.time(),
}
# আইড্যামপোটেন্সি চেক করা
if idempotency_key:
existing = self.r.get(f"idemp:{idempotency_key}")
if existing:
return json.loads(existing)["id"]
queue_name = self.QUEUES[priority]
self.r.lpush(queue_name, json.dumps(task))
return task_id
def poll(self, visibility_timeout=30) -> dict | None:
"""ওয়ার্কার একটি টাস্কের জন্য খোঁজ নেবে বা পোল (poll) করবে। এটি হাই (high) → নরমাল (normal) → লো (low) চেক করে।"""
for priority in Priority:
queue_name = self.QUEUES[priority]
raw = self.r.rpop(queue_name)
if raw:
task = json.loads(raw)
# টাইমআউটের সাথে প্রসেসিং অবস্থা মার্ক করা
deadline = time.time() + visibility_timeout
self.r.zadd(self.PROCESSING, {raw: deadline})
return task
return None # কোনো টাস্ক নেই
def ack(self, task: dict):
"""ওয়ার্কার সফলভাবে শেষ করার স্বীকারোক্তি জানাবে।"""
raw = json.dumps(task)
self.r.zrem(self.PROCESSING, raw)
# রেজাল্ট এবং আইড্যামপোটেন্সি রেকর্ড সংরক্ষণ
self.r.set(f"result:{task['id']}", json.dumps({
"status": "completed", "completed_at": time.time()
}), ex=86400) # TTL ২৪ ঘণ্টা
if task.get("idempotency_key"):
self.r.set(f"idemp:{task['idempotency_key']}",
json.dumps(task), ex=86400)
def nack(self, task: dict):
"""ওয়ার্কার ব্যর্থ হওয়ার খবর জানাবে এবং রি-এনকিউ (Re-enqueue) করবে বা DLQ-তে পাঠাবে।"""
task["attempts"] += 1
if task["attempts"] >= task["max_retries"]:
# ডেড লেটার কিউ-তে সরানো হচ্ছে
self.r.lpush("tasks:dlq", json.dumps(task))
print(f"Task {task['id']} moved to DLQ after"
f" {task['attempts']} attempts")
else:
# ব্যাকঅফ-এর সাহায্যে রি-এনকিউ (re-enqueue)
delay = 2 ** task["attempts"] # এক্সপোনেনশিয়াল ব্যাকঅফ
time.sleep(delay)
self.r.lpush(self.QUEUES[Priority.NORMAL],
json.dumps(task))
# --- ব্যবহার (Usage) ---
q = TaskQueue()
# প্রোডিউসারের টাস্ক সাবমিট
task_id = q.submit("send_email", {
"subject": "Welcome!"
}, priority=Priority.HIGH, idempotency_key="welcome-user-42")
print(f"Submitted: {task_id}")
# ওয়ার্কার (Worker) লুপ
task = q.poll()
if task:
try:
print(f"Processing: {task['type']}")
# ... কাজ বা নির্দিষ্ট প্রোগ্রামটি করতে হবে ...
q.ack(task)
except Exception:
q.nack(task)
Output
Submitted: a1b2c3d4
Processing: send_email
Note: ইন্টারভিউ টিপস: ফেইলিউর হ্যান্ডলিং বা কাজ আটকে গেলে কী হবে তা সবসময় আলোচনা করবেন — কোনো কাজ চলাকালে ওয়ার্কার ক্র্যাশ করলে কী হবে? তখন ভিজিবিলিটি টাইমআউট প্যাটার্ন এবং আইড্যাম্পোটেন্সি কী-এর (idempotency keys) কথা তুলে ধরবেন। এছাড়াও এক্সাক্টলি-ওয়ান্স (exactly-once) (যা অনেক কঠিন, এবং এতে ডিস্ট্রিবিউটেড ট্রানজাকশনের প্রয়োজন হয়) এবং অ্যাট-লিস্ট-ওয়ান্স (at-least-once) (এটি বেশি প্রাক্টিক্যাল, তবে এতে হ্যান্ডলারগুলোকে আইড্যাম্পোটেন্ট হতে হয়)-এর মধ্যকার ট্রেড-অফগুলো পরিষ্কার করে বুঝিয়ে বলুন।

Key Metrics

এনকিউ টাস্ক (Enqueue task)
রেডিসে (Redis) LPUSH অথবা এসকিউএসে (SQS) সেন্ডমেসেজ (SendMessage)
~১-৫ ms \(O(1)\)
ডিকিউ টাস্ক (Dequeue task)
রেডিস (Redis) থেকে RPOP অথবা এসকিউএস (SQS) থেকে রিসিভমেসেজ (ReceiveMessage)
~১-৫ ms \(O(1)\)
থ্রুপুট (প্রতি ওয়ার্কার পিছু) (Throughput - per worker)
গড়ে টাস্ক কত সময় নিতে পারে তার ওপর নির্ভর করে (যেমন ৫ সে.)
~১০-১২ টাস্ক/মিনিট —
ওয়ার্কার স্কেলিং (Worker scaling)
ওয়ার্কার দ্বিগুণ হলে থ্রুপুটও দ্বিগুণ হবে
লিনিয়ার (Linear) \(O(n)\)
ফেইলিউর রেট (লক্ষ্য) (Failure rate - target)
কিছু রিট্রাইয়ের (retries) পর, DLQ-তে পাঠানোর আগে
< ০.১% —
পিক সময়ে কিউ-এর গভীরতা বা ডেপথ (Queue depth at peak)
৩৫০ টাস্ক/সেকেন্ড রেটে আসলে ৫ মিনিটের বার্স্ট (Burst)
~১০৫ হাজার টাস্ক —

ছোট কুইজ

একটি টাস্ক কিউতে ভিজিবিলিটি টাইমআউট (visibility timeout) কী?

পড়া চালিয়ে যান