Tuần 08: Message Queue
“Một hệ thống không có Message Queue giống như một nhà hàng mà bếp trưởng phải chạy ra tận bàn phục vụ từng món — chỉ cần đông khách một chút là sập.”
Tags: system-design message-queue kafka rabbitmq alex-xu Student: Hieu Prerequisite: Tuan-05-Load-Balancer · Tuan-07-Database-Sharding-Replication Liên quan: Tuan-06-Cache-Strategy · Tuan-09-Rate-Limiter · Tuan-13-Monitoring-Observability · Tuan-17-Design-Chat-System
1. Context & Why
Analogy đời thường — Hệ thống đặt đồ ăn online
Hieu, tưởng tượng em mở một hệ thống đặt đồ ăn online (kiểu GrabFood / ShopeeFood). Khi khách đặt món:
-
Không có Message Queue: Khách bấm “Đặt món” → app phải đợi bếp nấu xong mới trả về “Đặt thành công”. Nếu bếp đang nấu 50 món → khách thứ 51 phải đợi 30 phút mới thấy “Đặt thành công”. Khách bỏ đi. Hệ thống chết.
-
Có Message Queue: Khách bấm “Đặt món” → order được bỏ vào hàng đợi (queue) → app trả về ngay “Đã nhận order, đang xử lý”. Bếp lấy order từ queue ra nấu theo tốc độ của mình. Khách vui, bếp không bị quá tải, hệ thống sống.
Cái queue ở giữa chính là Message Queue — nó tách rời (decouple) người gửi (Producer / khách) và người nhận (Consumer / bếp).
Tại sao cần Message Queue trong System Design?
| Vấn đề | Không có Queue | Có Queue |
|---|---|---|
| Tight coupling | Service A gọi trực tiếp Service B. B chết → A chết theo | A gửi message vào queue. B chết → message vẫn nằm trong queue, đợi B recover |
| Overload | Spike traffic → downstream service bị overwhelm | Queue đóng vai trò buffer, downstream consume theo capacity |
| Synchronous blocking | Caller phải đợi response → latency tích luỹ | Fire-and-forget → caller return ngay, processing bất đồng bộ (asynchronous) |
| Scaling | Scale từng service khó vì chúng phụ thuộc nhau | Producer và Consumer scale độc lập |
| Retry & Fault tolerance | Nếu call fail → mất data hoặc phải tự implement retry | Queue tự giữ message, hỗ trợ retry, dead letter queue |
Tại sao Alex Xu đặt nó ở đây?
Vì Message Queue là xương sống của mọi distributed system. Hầu hết các design ở phần sau (Chat System, Notification System, News Feed) đều dùng queue. Không hiểu queue → không thể design hệ thống lớn.
Rule of thumb: Bất cứ khi nào em thấy hai service giao tiếp mà response time không cần real-time → hãy cân nhắc đặt một queue ở giữa.
2. Deep Dive — Các khái niệm cốt lõi
2.1 Message Queue vs Event Streaming — Hai paradigm khác nhau
Đây là điểm gây nhầm lẫn lớn nhất. Hieu cần phân biệt rõ:
| Đặc điểm | Message Queue (Hàng đợi tin nhắn) | Event Streaming (Luồng sự kiện) |
|---|---|---|
| Ví von | Hộp thư: lấy thư ra → thư biến mất | Nhật ký (log): ai cũng đọc được, data vẫn còn |
| Mô hình | Point-to-point: mỗi message chỉ 1 consumer xử lý | Pub/Sub + Log: nhiều consumer đọc cùng một stream |
| Message sau khi consume | Bị xoá khỏi queue (acknowledged & removed) | Vẫn còn trong log (retention period) |
| Replay | Không thể replay | Có thể replay từ bất kỳ offset nào |
| Use case | Task distribution, work queue, RPC | Event sourcing, analytics pipeline, log aggregation |
| Đại diện | RabbitMQ, SQS, ActiveMQ | Kafka, Pulsar, Kinesis, Redpanda |
Aha Moment: Kafka không phải message queue theo nghĩa truyền thống. Nó là distributed commit log. Hiểu sai bản chất → design sai.
2.2 Apache Kafka — Deep Dive
2.2.1 Kiến trúc tổng quan
Kafka gồm các thành phần chính:
- Broker: Một Kafka server. Cluster thường có 3–5+ brokers.
- Topic (Chủ đề): Logical channel cho messages. Ví dụ:
orders,payments,notifications. - Partition (Phân vùng): Mỗi topic được chia thành nhiều partitions. Đây là đơn vị parallelism và ordering.
- Producer (Người gửi): Gửi messages vào topic.
- Consumer (Người nhận): Đọc messages từ topic.
- Consumer Group (Nhóm consumer): Một nhóm consumers cùng đọc một topic, mỗi partition chỉ được 1 consumer trong group đọc.
- ZooKeeper / KRaft: Quản lý metadata của cluster (KRaft là thay thế mới cho ZooKeeper từ Kafka 3.3+).
2.2.2 Partitions — Chìa khoá của scalability
Topic: orders (3 partitions)
Partition 0: [order-1] [order-4] [order-7] [order-10] ...
Partition 1: [order-2] [order-5] [order-8] [order-11] ...
Partition 2: [order-3] [order-6] [order-9] [order-12] ...
Quy tắc phân partition:
- Producer chọn partition bằng partition key (thường là entity ID).
- Messages cùng partition key → luôn vào cùng partition → đảm bảo ordering.
- Ví dụ:
partition_key = user_id→ tất cả orders của user 123 luôn vào cùng 1 partition → xử lý đúng thứ tự.
Công thức chọn partition:
Pitfall: Nếu không set partition key → Kafka dùng round-robin → không đảm bảo ordering. Đây là sai lầm phổ biến nhất.
2.2.3 Consumer Groups — Parallelism pattern
Topic: orders (4 partitions)
Consumer Group A (Order Processing):
Consumer A1 → reads Partition 0, 1
Consumer A2 → reads Partition 2, 3
Consumer Group B (Analytics):
Consumer B1 → reads Partition 0
Consumer B2 → reads Partition 1
Consumer B3 → reads Partition 2
Consumer B4 → reads Partition 3
Quy tắc vàng:
- Trong 1 consumer group: mỗi partition chỉ gán cho đúng 1 consumer.
- Nếu
num_consumers > num_partitions→ consumer thừa sẽ idle (lãng phí). - Nếu
num_consumers < num_partitions→ một consumer đọc nhiều partitions. - Lý tưởng:
num_consumers = num_partitionscho throughput tối đa.
Aha Moment: Muốn scale consumers → phải tăng số partitions. Partition count là bottleneck ceiling cho parallelism.
2.2.4 Offsets — Cơ chế tracking progress
Mỗi message trong partition có một offset (số thứ tự tăng dần, bắt đầu từ 0):
Partition 0:
Offset: 0 1 2 3 4 5 6 7 8
Data: [m0] [m1] [m2] [m3] [m4] [m5] [m6] [m7] [m8]
^ ^
committed offset latest offset
(consumer đã xử lý) (producer mới nhất)
- Committed offset: Vị trí cuối cùng consumer đã xử lý xong và commit.
- Latest offset (Log-end offset): Message mới nhất producer ghi vào.
- Consumer lag = Latest offset - Committed offset = số message chưa xử lý.
Consumer lag là metric quan trọng nhất để monitor Kafka. Lag tăng liên tục → consumer không kịp xử lý → death spiral.
2.2.5 Retention & Compaction
Time-based retention (Lưu giữ theo thời gian):
- Mặc định: 7 ngày (
retention.ms = 604800000). - Sau 7 ngày → messages bị xoá bất kể đã consume hay chưa.
- Use case: Event logs, analytics data.
Size-based retention (Lưu giữ theo dung lượng):
- Giới hạn size partition:
retention.bytes = 1073741824(1GB). - Khi vượt → xoá segments cũ nhất.
Log compaction (Nén log):
- Chỉ giữ message mới nhất cho mỗi key.
- Use case: Changelog, state snapshot — ví dụ giữ trạng thái mới nhất của mỗi user profile.
Trước compaction:
Key=user1, value={"name":"Hieu"} offset 0
Key=user2, value={"name":"Tuan"} offset 1
Key=user1, value={"name":"Hieu Nguyen"} offset 2 ← mới hơn
Key=user3, value={"name":"Minh"} offset 3
Sau compaction:
Key=user1, value={"name":"Hieu Nguyen"} offset 2 ← chỉ giữ bản mới nhất
Key=user2, value={"name":"Tuan"} offset 1
Key=user3, value={"name":"Minh"} offset 3
2.3 RabbitMQ — Deep Dive
2.3.1 Kiến trúc AMQP
RabbitMQ implement giao thức AMQP (Advanced Message Queuing Protocol). Luồng message:
Producer → Exchange → [Binding Rules] → Queue → Consumer
Không giống Kafka (producer ghi thẳng vào partition), RabbitMQ có Exchange đứng giữa làm router.
2.3.2 Exchange Types (Kiểu tổng đài)
| Exchange Type | Routing Logic | Use Case |
|---|---|---|
| Direct | Route theo exact match routing_key | Task queue: routing_key="order.create" → chỉ queue xử lý order mới nhận |
| Fanout | Broadcast tới tất cả queues bound to exchange | Notification: 1 event → gửi email + SMS + push cùng lúc |
| Topic | Route theo pattern matching (* = 1 word, # = 0+ words) | order.* match order.create, order.cancel; order.# match order.payment.success |
| Headers | Route theo message headers (không dùng routing key) | Complex routing dựa trên metadata |
Ví dụ Topic Exchange:
Producer gửi message với routing_key = "order.payment.success"
Queue "order-processing" bound với pattern "order.#" → NHẬN ✓
Queue "payment-audit" bound với pattern "*.payment.*" → NHẬN ✓
Queue "email-service" bound với pattern "order.create" → KHÔNG nhận ✗
2.3.3 Bindings — Kết nối Exchange với Queue
Binding là rule nói cho Exchange biết message nào đi vào queue nào:
# Bind queue "order_queue" vào exchange "orders" với routing key "order.create"
channel.queue_bind(
exchange='orders',
queue='order_queue',
routing_key='order.create'
)Một queue có thể bind nhiều routing keys, một exchange có thể bind nhiều queues.
2.3.4 Acknowledgements (ACK) — Đảm bảo message được xử lý
RabbitMQ dùng ACK mechanism:
- Auto-ack (
auto_ack=True): Message bị xoá khỏi queue ngay khi gửi cho consumer. Nếu consumer crash trước khi xử lý xong → mất message. - Manual-ack (
auto_ack=False): Consumer phải gửi ACK sau khi xử lý xong. Nếu consumer crash → message quay lại queue (requeue) để consumer khác xử lý.
# Manual ACK
def callback(ch, method, properties, body):
try:
process_order(body)
ch.basic_ack(delivery_tag=method.delivery_tag) # ACK: đã xử lý xong
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # NACK: đưa lại queue- NACK + Dead Letter: Nếu message fail quá nhiều lần → gửi vào Dead Letter Queue thay vì requeue vô hạn.
2.4 Delivery Semantics — Ba mức đảm bảo
Đây là khái niệm cực kỳ quan trọng trong distributed messaging:
At-most-once (Nhiều nhất một lần)
Producer → send → Broker (không đợi ACK)
Broker → deliver → Consumer (auto-ack, không retry)
- Message có thể mất nhưng không bao giờ duplicate.
- Use case: Metrics, logs — mất vài data point không sao.
- Cách implement: Fire-and-forget,
acks=0trong Kafka.
At-least-once (Ít nhất một lần)
Producer → send → Broker → ACK (retry nếu không nhận ACK)
Broker → deliver → Consumer → ACK (retry nếu consumer không ACK)
- Message không mất nhưng có thể duplicate.
- Use case: Payment processing (thà xử lý 2 lần còn hơn mất) — nhưng consumer phải idempotent.
- Cách implement:
acks=all+ retry trong Kafka, manual-ack trong RabbitMQ.
Exactly-once (Đúng một lần)
Producer → idempotent send → Broker (dedup bằng producer ID + sequence number)
Broker → deliver → Consumer → process + commit offset ATOMICALLY
- Message không mất và không duplicate.
- Đây là holy grail nhưng cực kỳ khó và đắt.
- Kafka hỗ trợ exactly-once qua Idempotent Producer + Transactional API (từ Kafka 0.11+).
- RabbitMQ không hỗ trợ native exactly-once → phải tự implement ở application level.
Aha Moment: Exactly-once trong Kafka chỉ đảm bảo within Kafka (produce + consume). Nếu consumer ghi vào database bên ngoài → vẫn cần idempotent logic ở application.
2.5 Dead Letter Queue (DLQ) — Hàng đợi “thư chết”
Khi message không thể xử lý được (format sai, business logic reject, timeout), thay vì:
- Retry vô hạn → infinite loop, consumer bị block.
- Drop message → mất data.
Ta gửi nó vào Dead Letter Queue:
Main Queue → Consumer xử lý
↓ (fail 3 lần)
Dead Letter Queue → Manual review / Alert / Re-process later
DLQ pattern trong RabbitMQ:
# Declare main queue với dead letter exchange
channel.queue_declare(
queue='orders',
arguments={
'x-dead-letter-exchange': 'dlx', # Dead letter exchange
'x-dead-letter-routing-key': 'dlq.orders', # Routing key cho DLQ
'x-message-ttl': 30000, # 30s timeout
'x-max-delivery-count': 3 # Max 3 retries (RabbitMQ 3.8+)
}
)
# Declare dead letter queue
channel.queue_declare(queue='dlq.orders')
channel.queue_bind(exchange='dlx', queue='dlq.orders', routing_key='dlq.orders')DLQ trong Kafka: Kafka không có native DLQ. Pattern phổ biến:
Topic: orders → Consumer xử lý
↓ (fail)
Topic: orders.dlq → Ghi message fail vào topic riêng
2.6 Backpressure — Cơ chế chống quá tải
Backpressure (Áp lực ngược) xảy ra khi Producer gửi nhanh hơn Consumer xử lý:
Producer: 10,000 msg/s → Queue: [█████████████████] 99% full → Consumer: 2,000 msg/s
↑
BACKPRESSURE ZONE
Các chiến lược xử lý:
| Chiến lược | Cách hoạt động | Trade-off |
|---|---|---|
| Drop | Bỏ message mới khi queue đầy | Mất data, nhưng hệ thống sống |
| Block | Producer bị block cho đến khi queue có chỗ | Không mất data, nhưng producer bị chậm |
| Buffer to disk | Queue overflow ra disk | Chậm hơn memory nhưng capacity lớn hơn |
| Scale consumers | Auto-scale thêm consumer instances | Tốn resource nhưng giải quyết gốc rễ |
| Rate limit producer | Giới hạn tốc độ gửi | Simple nhưng cần coordination |
Kafka approach: Kafka dùng disk-based log → gần như không bị memory pressure. Backpressure thể hiện qua consumer lag tăng. Giải pháp: scale consumers hoặc tăng partitions.
RabbitMQ approach: RabbitMQ chủ yếu in-memory. Khi memory > threshold (mặc định 40% RAM) → kích hoạt flow control: block tất cả producers cho đến khi memory giảm.
2.7 Message Ordering Guarantees — Đảm bảo thứ tự
| Hệ thống | Ordering Guarantee |
|---|---|
| Kafka | Ordering trong 1 partition (per-partition ordering). Không đảm bảo ordering across partitions. |
| RabbitMQ | Ordering trong 1 queue nếu chỉ có 1 consumer. Nhiều consumers → không đảm bảo. |
| SQS Standard | Best-effort ordering — không đảm bảo. |
| SQS FIFO | Strict ordering trong 1 message group (tương tự Kafka partition). |
Pitfall: Muốn global ordering (tất cả messages đúng thứ tự) → chỉ dùng 1 partition (Kafka) hoặc 1 queue + 1 consumer (RabbitMQ). Nhưng throughput sẽ bị giới hạn bởi 1 consumer. Trong thực tế, hầu hết hệ thống chỉ cần per-entity ordering (ví dụ: tất cả events của 1 user đúng thứ tự).
2.8 Idempotent Consumers — Consumer “bất biến”
Vì at-least-once delivery có thể gửi message trùng, consumer phải idempotent (xử lý cùng message nhiều lần → kết quả giống nhau).
Pattern phổ biến:
def process_payment(message):
idempotency_key = message['payment_id']
# Check: đã xử lý chưa?
if db.exists(f"processed:{idempotency_key}"):
logger.info(f"Duplicate message, skipping: {idempotency_key}")
return # Skip
# Process
execute_payment(message)
# Mark as processed (với TTL phù hợp)
db.set(f"processed:{idempotency_key}", "1", ex=86400 * 7) # 7 ngàyCác kỹ thuật idempotency:
- Idempotency key: Mỗi message có unique ID. Consumer check trước khi xử lý.
- Database unique constraint:
INSERT ... ON CONFLICT DO NOTHING. - Conditional update:
UPDATE accounts SET balance = balance - 100 WHERE version = 5(optimistic locking). - Deduplication window: Giữ set of processed IDs trong Redis với TTL.
2.9 Pub/Sub Pattern — Publish/Subscribe
Pub/Sub (Phát/Đăng ký) là pattern mà:
- Publisher phát message mà không biết ai sẽ nhận.
- Subscriber đăng ký nhận message mà không biết ai gửi.
- Decoupling tối đa.
Publisher → Topic/Exchange → Subscriber 1 (Email Service)
→ Subscriber 2 (SMS Service)
→ Subscriber 3 (Analytics Service)
→ Subscriber 4 (Audit Log Service)
Kafka: Mỗi Consumer Group là 1 logical subscriber. Nhiều groups đọc cùng topic = Pub/Sub.
RabbitMQ: Dùng Fanout Exchange → broadcast message tới tất cả bound queues.
2.10 Fan-out — Nhân bản message
Fan-out là khi 1 message cần được gửi tới nhiều destinations:
Ví dụ thực tế: User đặt order → cần:
- Order Service xử lý order.
- Payment Service charge tiền.
- Notification Service gửi email xác nhận.
- Analytics Service ghi log.
- Inventory Service trừ kho.
Fan-out trong Kafka:
Topic: order.created
→ Consumer Group: order-processing (xử lý order)
→ Consumer Group: payment-service (charge tiền)
→ Consumer Group: notification-service (gửi email)
→ Consumer Group: analytics-pipeline (ghi log)
→ Consumer Group: inventory-service (trừ kho)
Mỗi consumer group đọc toàn bộ messages trong topic → fan-out tự nhiên.
Fan-out trong RabbitMQ: Dùng Fanout Exchange → mỗi service có queue riêng bound tới exchange.
2.11 CQRS với Event Sourcing — Giới thiệu
CQRS (Command Query Responsibility Segregation): Tách hệ thống thành 2 phần:
- Command side (Write): Xử lý commands (tạo order, update profile).
- Query side (Read): Phục vụ queries (hiển thị order history, search).
Event Sourcing: Thay vì lưu trạng thái hiện tại, lưu toàn bộ chuỗi events:
Truyền thống (State-based):
accounts table: { user_id: 123, balance: 800 }
Event Sourcing:
events table:
1. AccountCreated { user_id: 123, balance: 1000 }
2. MoneyWithdrawn { user_id: 123, amount: 200 }
3. MoneyDeposited { user_id: 123, amount: 500 }
4. MoneyWithdrawn { user_id: 123, amount: 500 }
→ Current state = replay events = 1000 - 200 + 500 - 500 = 800
Message Queue trong CQRS + Event Sourcing:
Command → Command Handler → Event Store (Kafka topic với log compaction)
↓
Event Bus (Kafka)
↓
Read Model Updater (Consumer)
↓
Read Database (Optimized for queries)
Kafka là lựa chọn tự nhiên cho Event Sourcing vì:
- Immutable log: Events chỉ append, không sửa/xoá.
- Replay: Rebuild read model bằng cách replay từ đầu.
- Compaction: Giữ snapshot mới nhất cho mỗi entity.
Lưu ý: CQRS + Event Sourcing là pattern phức tạp. Chỉ dùng khi hệ thống thực sự cần audit trail, temporal queries, hoặc tách biệt read/write scaling. Đừng over-engineer.
2.12 Comparison Table — Kafka vs RabbitMQ vs SQS vs Redis Streams
| Tiêu chí | Kafka | RabbitMQ | AWS SQS | Redis Streams |
|---|---|---|---|---|
| Mô hình | Distributed commit log | Message broker (AMQP) | Managed queue service | In-memory stream |
| Ordering | Per-partition | Per-queue (single consumer) | FIFO queue: per message group; Standard: best-effort | Per-stream |
| Throughput | Rất cao (millions msg/s per cluster) | Trung bình (~50K msg/s per node) | Unlimited (managed) | Cao (~100K msg/s per node) |
| Latency | Thấp (ms) nhưng batching thêm delay | Rất thấp (sub-ms) | Trung bình (10–100ms polling) | Rất thấp (sub-ms) |
| Retention | Configurable (giờ → vĩnh viễn) | Cho đến khi consumed | 14 ngày max | Configurable |
| Replay | Có (từ bất kỳ offset) | Không | Không | Có (từ bất kỳ ID) |
| Delivery | At-least-once, exactly-once | At-least-once, at-most-once | At-least-once (Standard), exactly-once (FIFO) | At-least-once |
| DLQ | Application-level | Native | Native | Application-level |
| Scaling | Horizontal (add brokers + partitions) | Vertical + clustering | Auto (managed) | Horizontal (cluster) |
| Ops complexity | Cao (ZooKeeper/KRaft, broker tuning) | Trung bình | Thấp (managed) | Thấp (nếu đã dùng Redis) |
| Best for | Event streaming, log aggregation, high-throughput pipelines | Task queues, RPC, routing logic phức tạp | Serverless, AWS-native, low-ops | Lightweight streaming, existing Redis stack |
| Protocol | Custom (Kafka protocol) | AMQP, MQTT, STOMP | HTTP/SQS API | Redis protocol (RESP) |
| Cost model | Self-hosted hoặc Confluent Cloud | Self-hosted hoặc CloudAMQP | Pay-per-request | Self-hosted (free) |
Rule of thumb cho Interview:
- Cần high-throughput event streaming + replay → Kafka.
- Cần flexible routing + low latency + DLQ → RabbitMQ.
- Cần managed, low-ops trên AWS → SQS.
- Đã dùng Redis, cần lightweight streaming → Redis Streams.
3. Estimation — Ước lượng cho Message Queue System
3.1 Kafka Cluster Throughput Calculation
Scenario: E-commerce platform, 10M orders/day, mỗi order trigger 5 events (created, paid, packed, shipped, delivered).
Assumptions
| Thông số | Giá trị |
|---|---|
| Orders/day | 10M |
| Events per order | 5 |
| Avg message size | 1 KB |
| Peak multiplier | 5x (flash sale) |
| Retention | 7 ngày |
| Replication factor | 3 |
Tính Message Throughput
Nhận xét: 2,895 msg/s — single Kafka broker dễ dàng handle (capacity ~100K msg/s). Nhưng cần tính thêm replication overhead.
Tính Partition Count
Công thức partition count:
Trong đó:
- = số partitions cần thiết
- = target producer throughput (msg/s)
- = throughput mỗi partition phía producer (~10,000 msg/s cho 1KB messages)
- = target consumer throughput (msg/s)
- = throughput mỗi consumer instance (~5,000 msg/s nếu xử lý business logic)
Tuy nhiên, trong production luôn set ít nhất 3–6 partitions per topic cho:
- Future scaling headroom
- High availability (nếu 1 consumer die, partition rebalance nhanh hơn)
- Parallel processing
Khuyến nghị cho scenario này: 6 partitions per topic.
Công thức nâng cao (bao gồm future growth):
Tính Consumer Lag Estimation
Nếu consumer bị chậm (ví dụ do database slow), consumer lag tích luỹ:
Ví dụ: produce rate = 2,895 msg/s, consume rate giảm xuống 2,000 msg/s (database slow):
Alert threshold: Nếu consumer lag > 5 phút → Warning. > 30 phút → Critical.
3.2 Storage for Message Retention
Nhận xét: 1TB cho 7 ngày retention — 3 brokers mỗi broker cần ~350GB disk. SSD recommended cho performance.
Thêm overhead thực tế:
3.3 Tóm tắt Estimation
| Metric | Value |
|---|---|
| Avg throughput | ~579 msg/s |
| Peak throughput | ~2,895 msg/s |
| Peak data rate | ~2.83 MB/s |
| Recommended partitions | 6 per topic |
| Storage (7 days, 3x replication) | ~1.37 TB |
| Consumer lag alert threshold | > 5 min Warning, > 30 min Critical |
| Brokers needed | 3 (min for HA) |
4. Security — Bảo mật Message Queue
4.1 Message Encryption — Mã hoá tin nhắn
In Transit (Trên đường truyền) — TLS/SSL
Mọi communication giữa Producer ↔ Broker ↔ Consumer phải qua TLS 1.2+:
# Kafka broker config (server.properties)
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEY_PASSWORD}
ssl.truststore.location=/var/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=${TRUSTSTORE_PASSWORD}
ssl.client.auth=required # Mutual TLS (mTLS)
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_AES_256_GCM_SHA384 # Strong cipher only# Kafka producer/consumer config
security.protocol=SSL
ssl.truststore.location=/var/kafka/ssl/client.truststore.jks
ssl.truststore.password=${TRUSTSTORE_PASSWORD}
ssl.keystore.location=/var/kafka/ssl/client.keystore.jks
ssl.keystore.password=${KEYSTORE_PASSWORD}Pitfall: Enabling TLS giảm throughput ~20–30%. Phải tính vào estimation.
At Rest (Khi lưu trữ) — Disk Encryption
Kafka không encrypt messages trên disk natively. Hai options:
- OS-level encryption: LUKS (Linux), FileVault (macOS), BitLocker (Windows). Transparent cho Kafka.
- Application-level encryption: Producer encrypt message trước khi gửi, consumer decrypt sau khi nhận. Kafka broker chỉ thấy ciphertext.
# Application-level encryption (Producer side)
from cryptography.fernet import Fernet
encryption_key = Fernet.generate_key() # Lưu trong KMS, KHÔNG hardcode
cipher = Fernet(encryption_key)
def produce_encrypted(producer, topic, key, value):
encrypted_value = cipher.encrypt(value.encode('utf-8'))
producer.send(topic, key=key.encode(), value=encrypted_value)Best practice: Dùng AWS KMS / HashiCorp Vault để quản lý encryption keys. Rotate keys mỗi 90 ngày → xem Tuan-15-Data-Security-Encryption.
4.2 ACL for Topics — Kiểm soát truy cập
Kafka hỗ trợ ACL (Access Control List) để kiểm soát ai được đọc/ghi topic nào:
# Cho phép user "order-service" chỉ WRITE vào topic "orders"
kafka-acls.sh --add \
--allow-principal User:order-service \
--operation Write \
--topic orders \
--bootstrap-server localhost:9092
# Cho phép user "analytics-service" chỉ READ từ topic "orders"
kafka-acls.sh --add \
--allow-principal User:analytics-service \
--operation Read \
--topic orders \
--group analytics-group \
--bootstrap-server localhost:9092
# Deny all by default (require explicit ACL)
kafka-acls.sh --add \
--deny-principal User:* \
--operation All \
--topic '*' \
--bootstrap-server localhost:9092RabbitMQ permissions:
# Set permissions cho user "order-service" trên vhost "production"
# configure: không được tạo/xoá queue
# write: chỉ exchange "orders"
# read: không được đọc
rabbitmqctl set_permissions -p production order-service \
"" "^orders$" ""
# analytics-service: chỉ đọc queue "order-events"
rabbitmqctl set_permissions -p production analytics-service \
"" "" "^order-events$"4.3 Schema Validation — Avro / Protobuf
Vấn đề: Producer gửi message format sai → consumer crash → cascade failure.
Giải pháp: Dùng Schema Registry (Confluent Schema Registry) để:
- Producer phải register schema trước khi gửi.
- Mỗi message đính kèm schema ID.
- Consumer validate message theo schema trước khi xử lý.
- Schema evolution rules (backward/forward compatible).
Avro schema ví dụ:
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "VND"},
{"name": "status", "type": {"type": "enum", "name": "OrderStatus",
"symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]}},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}Protobuf ví dụ:
syntax = "proto3";
package orders;
message OrderEvent {
string order_id = 1;
string user_id = 2;
double amount = 3;
string currency = 4;
OrderStatus status = 5;
int64 timestamp = 6;
enum OrderStatus {
CREATED = 0;
PAID = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}
}| Tiêu chí | Avro | Protobuf | JSON |
|---|---|---|---|
| Serialization size | Nhỏ | Nhỏ nhất | Lớn nhất |
| Schema evolution | Tốt nhất (backward + forward) | Tốt | Không enforce |
| Human readable | Không | Không | Có |
| Speed | Nhanh | Nhanh nhất | Chậm nhất |
| Kafka ecosystem | Native (Confluent) | Hỗ trợ | Hỗ trợ |
4.4 Poison Message Handling — Xử lý “tin nhắn độc”
Poison message = message mà consumer không bao giờ xử lý thành công (format sai, data corrupt, business logic impossible).
Queue: [msg1] [msg2] [POISON] [msg4] [msg5]
↑
Consumer thử xử lý → fail → retry → fail → retry → STUCK
Tất cả messages phía sau bị BLOCK
Chiến lược xử lý:
- Max retry count + DLQ: Sau N lần fail → đưa vào Dead Letter Queue.
- Circuit breaker: Nếu fail rate > threshold → ngừng consume, alert.
- Schema validation at ingestion: Reject invalid messages ngay từ producer.
- Quarantine topic: Tách poison messages ra topic riêng để debug.
MAX_RETRIES = 3
def process_with_poison_handling(message):
retry_count = int(message.headers.get('x-retry-count', 0))
try:
validate_schema(message.value) # Validate trước
process_business_logic(message.value)
except ValidationError as e:
# Schema invalid → CHẮC CHẮN fail lại → gửi thẳng DLQ
send_to_dlq(message, reason=f"Schema validation failed: {e}")
except BusinessLogicError as e:
if retry_count >= MAX_RETRIES:
send_to_dlq(message, reason=f"Max retries exceeded: {e}")
else:
# Retry với backoff
requeue_with_retry(message, retry_count + 1, delay=2 ** retry_count)
except Exception as e:
# Unknown error → DLQ ngay để không block queue
send_to_dlq(message, reason=f"Unexpected error: {e}")4.5 Replay Attack Prevention — Chống tấn công phát lại
Replay attack: Attacker capture 1 message hợp lệ (ví dụ: “chuyển 1 triệu cho user X”) rồi gửi lại nhiều lần.
Phòng chống:
-
Message ID + Deduplication: Mỗi message có unique ID. Consumer check duplicate trước khi xử lý (giống idempotent consumer pattern).
-
Timestamp + TTL: Message có timestamp, consumer reject messages quá cũ:
import time
MAX_MESSAGE_AGE_SECONDS = 300 # 5 phút
def validate_message_freshness(message):
msg_timestamp = message['timestamp']
current_time = time.time()
age = current_time - msg_timestamp
if age > MAX_MESSAGE_AGE_SECONDS:
raise StaleMessageError(f"Message too old: {age}s > {MAX_MESSAGE_AGE_SECONDS}s")
if age < -30: # Clock skew tolerance
raise InvalidTimestampError(f"Message from future: {age}s")- HMAC Signature: Producer ký message bằng shared secret. Consumer verify chữ ký:
import hmac
import hashlib
import json
SECRET_KEY = b'shared-secret-from-vault' # Lấy từ KMS
def sign_message(message: dict) -> str:
payload = json.dumps(message, sort_keys=True).encode()
return hmac.new(SECRET_KEY, payload, hashlib.sha256).hexdigest()
def verify_message(message: dict, signature: str) -> bool:
expected = sign_message(message)
return hmac.compare_digest(expected, signature)- Nonce: Thêm random nonce vào mỗi message. Consumer tracking nonces đã thấy → reject duplicate nonce.
5. DevOps — Triển khai & Monitoring
5.1 Kafka Docker Compose — Full Stack
# docker-compose.kafka.yml
version: '3.8'
services:
# --- ZooKeeper (sẽ deprecated, nhưng cần cho Kafka < 3.3) ---
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# Security
ZOOKEEPER_AUTH_PROVIDER_1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-log:/var/lib/zookeeper/log
healthcheck:
test: echo ruok | nc localhost 2181 | grep imok
interval: 10s
timeout: 5s
retries: 5
networks:
- kafka-net
# --- Kafka Broker 1 ---
kafka-1:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-1
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,EXTERNAL://localhost:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_NUM_PARTITIONS: 6
KAFKA_LOG_RETENTION_HOURS: 168 # 7 ngày
KAFKA_LOG_RETENTION_BYTES: 1073741824 # 1GB per partition
KAFKA_LOG_SEGMENT_BYTES: 536870912 # 512MB per segment
KAFKA_MESSAGE_MAX_BYTES: 1048576 # 1MB max message size
KAFKA_COMPRESSION_TYPE: lz4 # Compression cho throughput
# JMX for monitoring
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: kafka-1
volumes:
- kafka-1-data:/var/lib/kafka/data
healthcheck:
test: kafka-broker-api-versions --bootstrap-server localhost:9092
interval: 15s
timeout: 10s
retries: 5
networks:
- kafka-net
# --- Kafka Broker 2 ---
kafka-2:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-2
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9093,EXTERNAL://localhost:29093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_NUM_PARTITIONS: 6
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_RETENTION_BYTES: 1073741824
KAFKA_MESSAGE_MAX_BYTES: 1048576
KAFKA_COMPRESSION_TYPE: lz4
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: kafka-2
volumes:
- kafka-2-data:/var/lib/kafka/data
healthcheck:
test: kafka-broker-api-versions --bootstrap-server localhost:9093
interval: 15s
timeout: 10s
retries: 5
networks:
- kafka-net
# --- Kafka Broker 3 ---
kafka-3:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-3
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9094,EXTERNAL://localhost:29094
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_NUM_PARTITIONS: 6
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_RETENTION_BYTES: 1073741824
KAFKA_MESSAGE_MAX_BYTES: 1048576
KAFKA_COMPRESSION_TYPE: lz4
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: kafka-3
volumes:
- kafka-3-data:/var/lib/kafka/data
healthcheck:
test: kafka-broker-api-versions --bootstrap-server localhost:9094
interval: 15s
timeout: 10s
retries: 5
networks:
- kafka-net
# --- Schema Registry ---
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
container_name: schema-registry
depends_on:
kafka-1:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9093,kafka-3:9094
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- kafka-net
# --- Kafka UI (monitoring) ---
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
kafka-1:
condition: service_healthy
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9093,kafka-3:9094
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_METRICS_PORT: 9999
# Auth for UI (basic)
AUTH_TYPE: LOGIN_FORM
SPRING_SECURITY_USER_NAME: admin
SPRING_SECURITY_USER_PASSWORD: ${KAFKA_UI_PASSWORD:-admin123}
networks:
- kafka-net
# --- Prometheus JMX Exporter ---
jmx-exporter:
image: bitnami/jmx-exporter:latest
container_name: jmx-exporter
ports:
- "5556:5556"
volumes:
- ./monitoring/jmx-config.yml:/etc/jmx-exporter/config.yml
command:
- "5556"
- /etc/jmx-exporter/config.yml
networks:
- kafka-net
# --- Prometheus ---
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- ./monitoring/alert-rules.yml:/etc/prometheus/alert-rules.yml
- prometheus-data:/prometheus
networks:
- kafka-net
# --- Grafana ---
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_PASSWORD:-admin123}
volumes:
- grafana-data:/var/lib/grafana
networks:
- kafka-net
volumes:
zookeeper-data:
zookeeper-log:
kafka-1-data:
kafka-2-data:
kafka-3-data:
prometheus-data:
grafana-data:
networks:
kafka-net:
driver: bridge5.2 Kafka UI — Giao diện monitoring
Kafka UI (provectuslabs/kafka-ui) cung cấp dashboard web để:
- Xem danh sách topics, partitions, replication status.
- Browse messages trong topic.
- Xem consumer groups và consumer lag real-time.
- Manage schemas trong Schema Registry.
Truy cập tại http://localhost:8080 sau khi chạy docker-compose.
5.3 Consumer Lag Monitoring — Burrow
Burrow (LinkedIn open-source) chuyên monitor consumer lag:
# burrow.toml
[general]
pidfile = "/var/run/burrow.pid"
stdout-logfile = "/var/log/burrow/burrow.log"
[zookeeper]
servers = ["zookeeper:2181"]
[cluster.local]
class-name = "kafka"
servers = ["kafka-1:9092", "kafka-2:9093", "kafka-3:9094"]
topic-refresh = 60
offset-refresh = 30
[consumer.local]
class-name = "kafka"
cluster = "local"
servers = ["kafka-1:9092", "kafka-2:9093", "kafka-3:9094"]
offsets-topic = "__consumer_offsets"
group-denylist = "^(console-consumer-|_).*$"
[httpserver.default]
address = ":8000"
[notifier.slack]
class-name = "http"
url-open = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
template-open = "/etc/burrow/templates/slack-open.tmpl"
template-close = "/etc/burrow/templates/slack-close.tmpl"
interval = 60
threshold = 2
group-denylist = "^(console-consumer-|_).*$"
# Evaluation rules:
# OK - consumer lag stable/decreasing
# WARNING - consumer lag increasing
# ERROR - consumer lag critically high or consumer stopped
# STALL - consumer offset not changing but lag exists5.4 Prometheus JMX Exporter — Metrics từ Kafka
# monitoring/jmx-config.yml
hostPort: kafka-1:9999
lowercaseOutputName: true
rules:
# Broker metrics
- pattern: kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>OneMinuteRate
name: kafka_server_broker_topic_metrics_messages_in_per_sec
type: GAUGE
- pattern: kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>OneMinuteRate
name: kafka_server_broker_topic_metrics_bytes_in_per_sec
type: GAUGE
- pattern: kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>OneMinuteRate
name: kafka_server_broker_topic_metrics_bytes_out_per_sec
type: GAUGE
# Request metrics
- pattern: kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(\w+)><>(\w+)
name: kafka_network_request_metrics_total_time_ms
labels:
request: $1
stat: $2
type: GAUGE
# Consumer group lag (the most important metric)
- pattern: kafka.server<type=FetcherLagMetrics, name=ConsumerLag, clientId=(.+), topic=(.+), partition=(\d+)><>Value
name: kafka_consumer_lag
labels:
client_id: $1
topic: $2
partition: $3
type: GAUGE
# Under-replicated partitions (data loss risk)
- pattern: kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value
name: kafka_server_under_replicated_partitions
type: GAUGE
# ISR shrinks/expands
- pattern: kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>OneMinuteRate
name: kafka_server_isr_shrinks_per_sec
type: GAUGE5.5 Prometheus Alert Rules
# monitoring/alert-rules.yml
groups:
- name: kafka_alerts
rules:
# Consumer lag increasing
- alert: KafkaConsumerLagHigh
expr: sum(kafka_consumer_lag) by (topic, consumer_group) > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer lag high: {{ $labels.consumer_group }} on {{ $labels.topic }}"
description: "Consumer lag is {{ $value }} messages. Threshold: 10,000."
# Consumer lag critical
- alert: KafkaConsumerLagCritical
expr: sum(kafka_consumer_lag) by (topic, consumer_group) > 100000
for: 2m
labels:
severity: critical
annotations:
summary: "CRITICAL consumer lag: {{ $labels.consumer_group }}"
description: "Consumer lag is {{ $value }} messages. Immediate action required."
# Consumer lag increasing rate (death spiral detection)
- alert: KafkaConsumerLagIncreasing
expr: rate(kafka_consumer_lag[5m]) > 100
for: 10m
labels:
severity: warning
annotations:
summary: "Consumer lag growing at {{ $value }} msg/s"
description: "Consumer is falling behind. Check consumer health."
# Under-replicated partitions (data loss risk)
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_under_replicated_partitions > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Under-replicated partitions detected"
description: "{{ $value }} partitions are under-replicated. Data loss risk."
# Broker down
- alert: KafkaBrokerDown
expr: count(up{job="kafka"}) < 3
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka broker down"
description: "Expected 3 brokers, found {{ $value }}."
# High disk usage
- alert: KafkaDiskUsageHigh
expr: (node_filesystem_size_bytes{mountpoint="/var/lib/kafka"} - node_filesystem_free_bytes{mountpoint="/var/lib/kafka"}) / node_filesystem_size_bytes{mountpoint="/var/lib/kafka"} > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka disk usage > 85%"5.6 Prometheus Config
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- alert-rules.yml
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets:
- 'jmx-exporter:5556'
metrics_path: /metrics
- job_name: 'burrow'
static_configs:
- targets:
- 'burrow:8000'
metrics_path: /v3/kafka/local/consumer6. Code Examples
6.1 Python Kafka Producer/Consumer — Exactly-Once Semantics
"""
Kafka Producer + Consumer with Exactly-Once Semantics (EOS)
Requires: pip install confluent-kafka
"""
import json
import logging
import signal
import sys
from datetime import datetime, timezone
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
from confluent_kafka.serialization import StringSerializer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================
# PRODUCER — Idempotent + Transactional
# ============================================================
class ExactlyOnceProducer:
"""
Kafka Producer with exactly-once semantics.
- enable.idempotence=True: Dedup tại broker (producer ID + sequence number)
- transactional.id: Cho phép atomic writes across partitions
"""
def __init__(self, bootstrap_servers: str, transactional_id: str):
self.config = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'producer-{transactional_id}',
# === Exactly-Once Config ===
'enable.idempotence': True, # Dedup at broker level
'transactional.id': transactional_id, # Enable transactions
'acks': 'all', # Wait for all ISR replicas
'max.in.flight.requests.per.connection': 5, # Max 5 for idempotent
# === Reliability ===
'retries': 2147483647, # Infinite retries (EOS handles it)
'delivery.timeout.ms': 120000, # 2 minutes timeout
'request.timeout.ms': 30000,
# === Performance ===
'compression.type': 'lz4', # Compress messages
'batch.size': 65536, # 64KB batch
'linger.ms': 5, # Wait 5ms to batch
}
self.producer = Producer(self.config)
self.producer.init_transactions() # Initialize transaction coordinator
logger.info(f"Producer initialized with transactional.id={transactional_id}")
def send_event(self, topic: str, key: str, event: dict):
"""Send single event within a transaction."""
try:
self.producer.begin_transaction()
# Add metadata
event['_metadata'] = {
'produced_at': datetime.now(timezone.utc).isoformat(),
'producer_id': self.config['transactional.id'],
}
self.producer.produce(
topic=topic,
key=key.encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
on_delivery=self._delivery_callback,
)
self.producer.commit_transaction()
logger.info(f"Sent event to {topic}: key={key}")
except KafkaException as e:
logger.error(f"Transaction failed: {e}")
self.producer.abort_transaction()
raise
def send_batch(self, topic: str, events: list[tuple[str, dict]]):
"""Send batch of events atomically (all-or-nothing)."""
try:
self.producer.begin_transaction()
for key, event in events:
event['_metadata'] = {
'produced_at': datetime.now(timezone.utc).isoformat(),
'producer_id': self.config['transactional.id'],
}
self.producer.produce(
topic=topic,
key=key.encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
on_delivery=self._delivery_callback,
)
self.producer.commit_transaction()
logger.info(f"Batch sent: {len(events)} events to {topic}")
except KafkaException as e:
logger.error(f"Batch transaction failed: {e}")
self.producer.abort_transaction()
raise
def _delivery_callback(self, err, msg):
if err:
logger.error(f"Delivery failed: {err}")
else:
logger.debug(f"Delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}")
def close(self):
self.producer.flush()
logger.info("Producer closed")
# ============================================================
# CONSUMER — Exactly-Once (Read-Process-Write pattern)
# ============================================================
class ExactlyOnceConsumer:
"""
Kafka Consumer with exactly-once semantics.
- isolation.level=read_committed: Chỉ đọc messages đã committed
- enable.auto.commit=False: Manual offset commit
- Idempotent processing via dedup store
"""
def __init__(self, bootstrap_servers: str, group_id: str, topics: list[str],
dedup_store: dict = None):
self.config = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'client.id': f'consumer-{group_id}',
# === Exactly-Once Config ===
'isolation.level': 'read_committed', # Only read committed messages
'enable.auto.commit': False, # Manual commit after processing
'auto.offset.reset': 'earliest', # Start from beginning if no offset
# === Performance ===
'max.poll.interval.ms': 300000, # 5 minutes max processing time
'session.timeout.ms': 45000,
'heartbeat.interval.ms': 15000,
'fetch.min.bytes': 1024, # 1KB min fetch
'fetch.max.wait.ms': 500,
}
self.consumer = Consumer(self.config)
self.consumer.subscribe(topics)
self.running = True
self.dedup_store = dedup_store if dedup_store is not None else {}
logger.info(f"Consumer initialized: group={group_id}, topics={topics}")
# Graceful shutdown
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def consume_loop(self, process_fn):
"""
Main consume loop with exactly-once processing.
process_fn(key, value) -> bool: Return True if processed successfully.
"""
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.debug(f"End of partition: {msg.topic()}[{msg.partition()}]")
continue
raise KafkaException(msg.error())
# Deserialize
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))
# Dedup check (idempotent consumer)
msg_id = f"{msg.topic()}-{msg.partition()}-{msg.offset()}"
if msg_id in self.dedup_store:
logger.info(f"Duplicate message, skipping: {msg_id}")
self.consumer.commit(message=msg)
continue
# Process
try:
success = process_fn(key, value)
if success:
# Mark as processed + commit offset ATOMICALLY
self.dedup_store[msg_id] = True
self.consumer.commit(message=msg)
logger.info(f"Processed: {msg_id}")
else:
logger.warning(f"Processing returned False: {msg_id}")
# Don't commit → message will be redelivered
except Exception as e:
logger.error(f"Processing error for {msg_id}: {e}")
# Don't commit → message will be redelivered
# In production: implement retry count + DLQ here
except KafkaException as e:
logger.error(f"Consumer error: {e}")
finally:
self.consumer.close()
logger.info("Consumer closed")
def _shutdown(self, signum, frame):
logger.info("Shutdown signal received")
self.running = False
# ============================================================
# USAGE EXAMPLE
# ============================================================
if __name__ == "__main__":
BOOTSTRAP = "localhost:29092,localhost:29093,localhost:29094"
# --- Producer ---
producer = ExactlyOnceProducer(
bootstrap_servers=BOOTSTRAP,
transactional_id="order-producer-001"
)
# Send order events
order = {
"order_id": "ORD-20240101-001",
"user_id": "USR-123",
"items": [
{"product_id": "PROD-456", "quantity": 2, "price": 150000},
{"product_id": "PROD-789", "quantity": 1, "price": 350000},
],
"total": 650000,
"currency": "VND",
"status": "CREATED",
}
producer.send_event(
topic="orders",
key=order["user_id"], # Partition key = user_id → ordering per user
event=order,
)
producer.close()
# --- Consumer ---
def process_order(key, value):
"""Business logic xử lý order."""
logger.info(f"Processing order: {value['order_id']} for user {key}")
# Simulate: save to database, update inventory, etc.
return True
consumer = ExactlyOnceConsumer(
bootstrap_servers=BOOTSTRAP,
group_id="order-processing-group",
topics=["orders"],
)
consumer.consume_loop(process_order)6.2 Node.js RabbitMQ — Pub/Sub with Dead Letter Queue
/**
* RabbitMQ Pub/Sub with Dead Letter Queue
* Requires: npm install amqplib
*/
const amqp = require('amqplib');
// ============================================================
// CONFIGURATION
// ============================================================
const CONFIG = {
url: process.env.RABBITMQ_URL || 'amqp://admin:admin123@localhost:5672',
exchange: 'order-events', // Main exchange (topic type)
dlxExchange: 'order-events-dlx', // Dead letter exchange
maxRetries: 3,
retryDelayMs: 5000, // 5 seconds between retries
prefetchCount: 10, // Process 10 messages at a time
};
// ============================================================
// PUBLISHER
// ============================================================
class OrderEventPublisher {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
this.connection = await amqp.connect(CONFIG.url);
this.channel = await this.connection.createConfirmChannel(); // Confirm mode for reliability
// Declare main exchange (topic type for flexible routing)
await this.channel.assertExchange(CONFIG.exchange, 'topic', {
durable: true, // Survive broker restart
});
console.log('[Publisher] Connected and exchange declared');
}
/**
* Publish order event with routing key pattern: order.<action>
* Examples: order.created, order.paid, order.shipped, order.cancelled
*/
async publish(routingKey, event) {
const message = {
...event,
_metadata: {
event_id: `evt-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`,
published_at: new Date().toISOString(),
routing_key: routingKey,
},
};
const buffer = Buffer.from(JSON.stringify(message));
return new Promise((resolve, reject) => {
this.channel.publish(
CONFIG.exchange,
routingKey,
buffer,
{
persistent: true, // Survive broker restart
contentType: 'application/json',
messageId: message._metadata.event_id,
timestamp: Date.now(),
headers: {
'x-retry-count': 0,
},
},
(err) => {
if (err) {
console.error(`[Publisher] Publish failed: ${err.message}`);
reject(err);
} else {
console.log(`[Publisher] Published: ${routingKey} → ${message._metadata.event_id}`);
resolve(message._metadata.event_id);
}
}
);
});
}
async close() {
await this.channel?.close();
await this.connection?.close();
console.log('[Publisher] Connection closed');
}
}
// ============================================================
// SUBSCRIBER with Dead Letter Queue
// ============================================================
class OrderEventSubscriber {
constructor(serviceName, bindingPatterns) {
this.serviceName = serviceName;
this.bindingPatterns = bindingPatterns; // e.g., ['order.created', 'order.#']
this.connection = null;
this.channel = null;
}
async connect() {
this.connection = await amqp.connect(CONFIG.url);
this.channel = await this.connection.createChannel();
await this.channel.prefetch(CONFIG.prefetchCount);
// --- Dead Letter Exchange & Queue ---
await this.channel.assertExchange(CONFIG.dlxExchange, 'fanout', { durable: true });
const dlqName = `${this.serviceName}-dlq`;
await this.channel.assertQueue(dlqName, {
durable: true,
arguments: {
'x-message-ttl': 86400000, // 24h retention for DLQ messages
},
});
await this.channel.bindQueue(dlqName, CONFIG.dlxExchange, '');
// --- Main Exchange & Queue ---
await this.channel.assertExchange(CONFIG.exchange, 'topic', { durable: true });
const queueName = `${this.serviceName}-queue`;
await this.channel.assertQueue(queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': CONFIG.dlxExchange, // Route failed msgs to DLX
'x-dead-letter-routing-key': `dlq.${this.serviceName}`,
},
});
// Bind queue to exchange with patterns
for (const pattern of this.bindingPatterns) {
await this.channel.bindQueue(queueName, CONFIG.exchange, pattern);
console.log(`[${this.serviceName}] Bound: ${CONFIG.exchange} → ${pattern} → ${queueName}`);
}
console.log(`[${this.serviceName}] Connected. Queue: ${queueName}, DLQ: ${dlqName}`);
return queueName;
}
/**
* Start consuming with retry + DLQ logic
*/
async subscribe(handler) {
const queueName = `${this.serviceName}-queue`;
await this.channel.consume(queueName, async (msg) => {
if (!msg) return;
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
let event;
try {
event = JSON.parse(msg.content.toString());
console.log(`[${this.serviceName}] Received: ${event._metadata?.event_id} (retry: ${retryCount})`);
// Process message
await handler(event, msg.fields.routingKey);
// ACK: processed successfully
this.channel.ack(msg);
console.log(`[${this.serviceName}] ACK: ${event._metadata?.event_id}`);
} catch (error) {
console.error(`[${this.serviceName}] Error processing: ${error.message}`);
if (retryCount >= CONFIG.maxRetries) {
// Max retries exceeded → send to DLQ
console.error(`[${this.serviceName}] Max retries (${CONFIG.maxRetries}) exceeded → DLQ`);
this.channel.reject(msg, false); // reject without requeue → goes to DLX
} else {
// Retry: republish with incremented retry count
console.log(`[${this.serviceName}] Retrying (${retryCount + 1}/${CONFIG.maxRetries})...`);
// Wait before retry (exponential backoff)
await new Promise(r => setTimeout(r, CONFIG.retryDelayMs * (retryCount + 1)));
this.channel.publish(
CONFIG.exchange,
msg.fields.routingKey,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount + 1,
'x-last-error': error.message,
'x-last-retry-at': new Date().toISOString(),
},
}
);
this.channel.ack(msg); // ACK original to remove from queue
}
}
});
}
async close() {
await this.channel?.close();
await this.connection?.close();
console.log(`[${this.serviceName}] Connection closed`);
}
}
// ============================================================
// DLQ PROCESSOR — Review and re-process failed messages
// ============================================================
class DLQProcessor {
constructor(serviceName) {
this.serviceName = serviceName;
this.dlqName = `${serviceName}-dlq`;
}
async connect() {
this.connection = await amqp.connect(CONFIG.url);
this.channel = await this.connection.createChannel();
await this.channel.prefetch(1);
console.log(`[DLQ Processor] Connected to ${this.dlqName}`);
}
async processDLQ(handler) {
await this.channel.consume(this.dlqName, async (msg) => {
if (!msg) return;
const event = JSON.parse(msg.content.toString());
console.log(`[DLQ] Reviewing: ${event._metadata?.event_id}`);
console.log(`[DLQ] Headers:`, msg.properties.headers);
try {
await handler(event, msg.properties.headers);
this.channel.ack(msg);
console.log(`[DLQ] Resolved: ${event._metadata?.event_id}`);
} catch (error) {
console.error(`[DLQ] Still failing: ${error.message}. Manual intervention needed.`);
this.channel.ack(msg); // Remove from DLQ, log for manual review
}
});
}
async close() {
await this.channel?.close();
await this.connection?.close();
}
}
// ============================================================
// USAGE EXAMPLE
// ============================================================
async function main() {
// --- Publisher ---
const publisher = new OrderEventPublisher();
await publisher.connect();
// --- Subscriber: Order Processing Service ---
const orderProcessor = new OrderEventSubscriber('order-processing', ['order.created']);
await orderProcessor.connect();
await orderProcessor.subscribe(async (event, routingKey) => {
console.log(`Processing order: ${event.order_id}`);
// Simulate business logic
if (event.total > 10000000) {
throw new Error('Order amount exceeds limit'); // Will retry then DLQ
}
// Save to database, etc.
});
// --- Subscriber: Notification Service ---
const notificationService = new OrderEventSubscriber('notification', ['order.*']);
await notificationService.connect();
await notificationService.subscribe(async (event, routingKey) => {
console.log(`Sending notification for: ${routingKey} → ${event.order_id}`);
// Send email, SMS, push notification
});
// --- Subscriber: Analytics Service ---
const analyticsService = new OrderEventSubscriber('analytics', ['order.#']);
await analyticsService.connect();
await analyticsService.subscribe(async (event, routingKey) => {
console.log(`Recording analytics: ${routingKey}`);
// Write to data warehouse
});
// --- Publish sample events ---
await publisher.publish('order.created', {
order_id: 'ORD-001',
user_id: 'USR-123',
total: 650000,
currency: 'VND',
items: [{ product: 'Pho', qty: 2, price: 325000 }],
});
await publisher.publish('order.paid', {
order_id: 'ORD-001',
payment_method: 'momo',
paid_at: new Date().toISOString(),
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\nShutting down...');
await publisher.close();
await orderProcessor.close();
await notificationService.close();
await analyticsService.close();
process.exit(0);
});
}
main().catch(console.error);6.3 Docker Compose — RabbitMQ Stack
# docker-compose.rabbitmq.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
- "15692:15692" # Prometheus metrics
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD:-admin123}
RABBITMQ_DEFAULT_VHOST: production
volumes:
- rabbitmq-data:/var/lib/rabbitmq
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
- ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 5s
retries: 5
networks:
- mq-net
volumes:
rabbitmq-data:
networks:
mq-net:
driver: bridge# rabbitmq/rabbitmq.conf
# Memory threshold: block producers when memory > 60% of RAM
vm_memory_high_watermark.relative = 0.6
# Disk free limit: block when disk < 2GB
disk_free_limit.absolute = 2GB
# Queue limits
queue_master_locator = min-masters
# Message TTL default: 24 hours
# (per-queue override possible)
# Prometheus plugin
prometheus.return_per_object_metrics = true# rabbitmq/enabled_plugins
[rabbitmq_management, rabbitmq_prometheus, rabbitmq_shovel, rabbitmq_shovel_management].
7. Mermaid Diagrams
7.1 Message Queue Architecture — Tổng quan
flowchart TD subgraph "Producers (Nguồn tin nhắn)" P1["Order Service<br/>📦"] P2["Payment Service<br/>💳"] P3["User Service<br/>👤"] end subgraph "Message Broker Layer" direction TB subgraph "Kafka Cluster" B1["Broker 1<br/>(Leader)"] B2["Broker 2<br/>(Follower)"] B3["Broker 3<br/>(Follower)"] end subgraph "Topics" T1["orders<br/>(6 partitions)"] T2["payments<br/>(3 partitions)"] T3["notifications<br/>(3 partitions)"] end SR["Schema Registry<br/>(Avro/Protobuf)"] end subgraph "Consumers (Người nhận)" subgraph "Consumer Group: order-processing" C1["Consumer 1<br/>(P0, P1)"] C2["Consumer 2<br/>(P2, P3)"] C3["Consumer 3<br/>(P4, P5)"] end subgraph "Consumer Group: analytics" C4["Analytics<br/>Consumer"] end subgraph "Consumer Group: notification" C5["Email Worker"] C6["SMS Worker"] end end subgraph "Dead Letter" DLQ["DLQ Topic<br/>orders.dlq"] DLQ_PROC["DLQ Processor<br/>(Manual Review)"] end subgraph "Monitoring" PROM["Prometheus<br/>(JMX Exporter)"] GRAF["Grafana<br/>Dashboard"] BURR["Burrow<br/>(Lag Monitor)"] ALERT["AlertManager<br/>→ Slack/PagerDuty"] end P1 -->|produce| T1 P2 -->|produce| T2 P3 -->|produce| T3 P1 -.->|validate schema| SR P2 -.->|validate schema| SR T1 --> C1 & C2 & C3 T1 --> C4 T3 --> C5 & C6 C1 -->|fail 3x| DLQ C2 -->|fail 3x| DLQ DLQ --> DLQ_PROC B1 <-->|replicate| B2 B2 <-->|replicate| B3 B1 <-->|replicate| B3 B1 -->|JMX metrics| PROM PROM --> GRAF PROM --> ALERT B1 -->|consumer offsets| BURR BURR --> PROM style DLQ fill:#ff6b6b,stroke:#333 style ALERT fill:#ff9800,stroke:#333 style SR fill:#4caf50,stroke:#333
7.2 Kafka Partition & Consumer Group — Chi tiết
flowchart LR subgraph "Topic: orders (6 partitions)" direction TB P0["Partition 0<br/>user-1, user-7, user-13<br/>offsets: 0→1542"] P1["Partition 1<br/>user-2, user-8, user-14<br/>offsets: 0→1389"] P2["Partition 2<br/>user-3, user-9, user-15<br/>offsets: 0→1456"] P3["Partition 3<br/>user-4, user-10, user-16<br/>offsets: 0→1278"] P4["Partition 4<br/>user-5, user-11, user-17<br/>offsets: 0→1501"] P5["Partition 5<br/>user-6, user-12, user-18<br/>offsets: 0→1423"] end subgraph "Consumer Group A: order-processing<br/>(3 consumers = balanced)" CA1["Consumer A1"] CA2["Consumer A2"] CA3["Consumer A3"] end subgraph "Consumer Group B: analytics<br/>(2 consumers = unbalanced)" CB1["Consumer B1"] CB2["Consumer B2"] end P0 --> CA1 P1 --> CA1 P2 --> CA2 P3 --> CA2 P4 --> CA3 P5 --> CA3 P0 --> CB1 P1 --> CB1 P2 --> CB1 P3 --> CB2 P4 --> CB2 P5 --> CB2 style P0 fill:#e3f2fd,stroke:#1976d2 style P1 fill:#e3f2fd,stroke:#1976d2 style P2 fill:#e8f5e9,stroke:#388e3c style P3 fill:#e8f5e9,stroke:#388e3c style P4 fill:#fff3e0,stroke:#f57c00 style P5 fill:#fff3e0,stroke:#f57c00
7.3 Dead Letter Queue Flow
sequenceDiagram participant P as Producer participant Q as Main Queue participant C as Consumer participant DLQ as Dead Letter Queue participant A as Alert System participant O as Operator P->>Q: Publish message (order.created) Q->>C: Deliver message C->>C: Process → FAIL (retry 1/3) C->>Q: NACK + requeue Q->>C: Redeliver message C->>C: Process → FAIL (retry 2/3) C->>Q: NACK + requeue Q->>C: Redeliver message C->>C: Process → FAIL (retry 3/3) Note over C,DLQ: Max retries exceeded! C->>DLQ: Reject → route to DLQ DLQ->>A: Alert: message in DLQ A->>O: Slack/PagerDuty notification O->>DLQ: Inspect failed message O->>O: Fix root cause O->>Q: Re-publish corrected message Q->>C: Deliver corrected message C->>C: Process → SUCCESS C->>Q: ACK
8. Aha Moments & Pitfalls — Đúc kết & Bẫy
Aha Moments
#1: Message Queue không chỉ là “cho message vào hàng đợi”. Nó là kiến trúc pattern quyết định cách services giao tiếp. Chọn sai queue technology → re-architect toàn bộ.
#2: Consumer lag là metric quan trọng nhất trong Kafka. Nếu lag tăng liên tục → hệ thống đang chết dần (death spiral). Monitor lag trước khi monitor throughput.
#3: Exactly-once không free. Nó giảm throughput ~20–40% (do transactions, idempotency checks). Hầu hết hệ thống chỉ cần at-least-once + idempotent consumer — rẻ hơn và đủ đúng.
#4: Kafka retain messages sau khi consume (log-based). RabbitMQ xoá sau khi ACK (queue-based). Đây là fundamental difference quyết định khi nào dùng cái nào.
#5: Partition count là quyết định khó thay đổi. Tăng partition count → phải re-distribute data → consumer rebalance → downtime. Plan partition count cho 2–3 năm tới.
Pitfalls — Bẫy thường gặp
Pitfall 1: Unbounded Queue Growth (Queue tăng không giới hạn)
Sai: Cứ push message vào queue mà không monitor queue depth. Hậu quả: Queue chiếm hết memory (RabbitMQ) hoặc disk (Kafka) → broker crash → mất toàn bộ messages. Đúng: Set
x-max-length(RabbitMQ) hoặcretention.bytes(Kafka). Monitor queue size. Alert khi > 80% capacity.
Pitfall 2: Consumer Lag Death Spiral
Scenario: Consumer chậm → lag tăng → rebalance trigger → consumer restart → lag tăng thêm → rebalance lại → vòng lặp chết. Đúng: Set
max.poll.interval.mshợp lý. Tách heavy processing ra thread riêng. Scale consumers trước khi lag critical.
Pitfall 3: Ordering Across Partitions
Sai: Assume Kafka đảm bảo global ordering. Dùng 6 partitions rồi expect messages đúng thứ tự. Hậu quả:
order.createdcó thể đến sauorder.paidnếu chúng ở khác partition. Đúng: Dùng cùng partition key cho related events (ví dụ:partition_key = order_id). Hoặc dùng 1 partition nếu cần strict global order (nhưng mất parallelism).
Pitfall 4: Message Size Limits
Sai: Gửi file 10MB qua Kafka message. Hậu quả: Broker reject (default max 1MB). Hoặc nếu tăng limit → memory pressure, slow replication, consumer timeout. Đúng: Kafka message nên < 1MB. Nếu cần gửi data lớn → lưu vào Object Storage (S3) + gửi reference link qua Kafka.
Pitfall 5: Quên configure replication
Sai:
replication.factor = 1trong production. Hậu quả: Broker chết → mất data vĩnh viễn. Đúng:replication.factor = 3,min.insync.replicas = 2. Chấp nhận 1 broker down mà không mất data.
Pitfall 6: Consumer commit offset trước khi xử lý xong
Sai:
enable.auto.commit = true(default) → Kafka auto commit offset mỗi 5s. Hậu quả: Consumer nhận message, auto commit, rồi crash trước khi xử lý xong → message mất. Đúng:enable.auto.commit = false+ manual commit sau khi xử lý thành công.
Pitfall 7: Không có Dead Letter Queue
Sai: Consumer gặp poison message → retry vô hạn → block cả queue. Đúng: Max retry (3–5 lần) + DLQ + alert. Luôn có DLQ — nó là safety net.
9. Internal Links — Liên kết nội bộ
Prerequisites (Đọc trước)
- Tuan-01-Scale-From-Zero-To-Millions — Hiểu tại sao cần decouple services
- Tuan-02-Back-of-the-envelope — Estimation framework (áp dụng cho queue sizing)
- Tuan-05-Load-Balancer — Distribute traffic trước khi vào queue
- Tuan-07-Database-Sharding-Replication — Consumer ghi data vào DB sau khi consume
Related (Liên quan)
- Tuan-06-Cache-Strategy — Cache + Queue pattern: cache invalidation qua events
- Tuan-09-Rate-Limiter — Rate limit producers để tránh overwhelm queue
- Tuan-10-Consistent-Hashing — Partition assignment strategy
- Tuan-13-Monitoring-Observability — Monitor consumer lag, throughput, error rate
- Tuan-15-Data-Security-Encryption — Encryption at rest, KMS cho message encryption
Applied in (Áp dụng trong)
- Tuan-17-Design-Chat-System — Message delivery qua queue
- Tuan-18-Design-Notification-System — Fan-out notifications qua Kafka/RabbitMQ
- Tuan-19-Design-News-Feed — Fanout-on-write pattern với message queue
Tham khảo
- Alex Xu, System Design Interview — Chapter 10: Design a Notification System (Message Queue usage)
- Jay Kreps, I Heart Logs — Kafka philosophy
- Confluent Kafka Documentation
- RabbitMQ Official Tutorials
- Kafka: The Definitive Guide (O’Reilly)
- Tuan-02-Back-of-the-envelope — Estimation framework
- Tuan-05-Load-Balancer — Traffic distribution trước queue
- Tuan-13-Monitoring-Observability — Monitoring patterns cho queue systems
Tuần tới: Tuan-09-Rate-Limiter — Bảo vệ hệ thống khỏi traffic spike bằng rate limiting