Design a Distributed Message Queue
SummaryCovers topic-partition architecture, append-only commit logs, consumer group...
Covers topic-partition architecture, append-only commit logs, consumer group...
Covers topic-partition architecture, append-only commit logs, consumer group coordination, replication with ISR, exactly-once semantics, and horizontal scaling through partition reassignment.
Design a Distributed Message Queue (Kafka)
Distributed message queues sit at the backbone of modern data infrastructure. They decouple producers from consumers, absorb traffic spikes, and enable event-driven architectures at massive scale. Designing one from scratch exposes fundamental distributed systems concepts: replication, consensus, ordering guarantees, and fault tolerance. This chapter architects a Kafka-like system capable of millions of messages per second with zero data loss.
Requirements
Functional Requirements
- Produce messages: Publishers send messages to named topics with optional partition keys.
- Consume messages: Subscribers read messages from topic partitions, tracking their position via offsets.
- Topic and partition management: Create, delete, and configure topics with a specified partition count and replication factor.
- Consumer groups: Multiple consumers form a group; each partition is assigned to exactly one consumer within the group for parallel processing.
- Message retention: Messages persist for a configurable retention period (e.g., 7 days), regardless of consumption status.
- Replay: Consumers can reset their offset to re-process historical messages.
Non-Functional Requirements
| Requirement | Target |
|---|---|
| Throughput | Millions of messages/sec per cluster |
| End-to-end latency | < 10ms (p99) for produce-to-consume |
| Durability | Zero message loss (with acks=all) |
| Ordering | Guaranteed within a partition |
| Availability | Survive loss of any single broker |
| Scalability | Horizontal scaling via partition addition |
Capacity Estimation
Assume a production cluster handling 2 million messages/sec:
- Average message size: 1 KB
- Raw write throughput: 2M × 1 KB = 2 GB/sec
- Replication factor 3: 2 GB/sec × 3 = 6 GB/sec aggregate disk write
- 7-day retention: 2 GB/sec × 86,400 sec/day × 7 days = ~1.2 PB raw storage (before replication)
- With replication: ~3.6 PB total storage
- Network: Each broker serves both replication traffic and consumer reads. Budget 10 Gbps per broker minimum.
- Broker count: With 6 GB/sec aggregate write and ~500 MB/sec per broker (sequential disk), need ~12 brokers minimum for writes alone. Add capacity for read serving and headroom → 20-30 broker cluster.
High-Level Design
┌───────────┐ ┌───────────────────────────────────────────────┐
│ Producers │────▶│ Broker Cluster │
└───────────┘ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ Topic A │ │ Topic A │ │ Topic A │ │
│ │ P0(L) │ │ P1(L) │ │ P2(L) │ │
│ │ P1(F) │ │ P2(F) │ │ P0(F) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ (L)=Leader (F)=Follower │
└───────────────────────┬───────────────────────┘
│
┌───────────────────────▼───────────────────────┐
│ Metadata Service (KRaft) │
│ Broker registry, partition assignments, │
│ consumer group coordination │
└───────────────────────────────────────────────┘
│
┌───────────────────────▼───────────────────────┐
│ Consumer Groups │
│ Group 1: [C1→P0, C2→P1, C3→P2] │
│ Group 2: [C4→P0+P1, C5→P2] │
└───────────────────────────────────────────────┘
Producers send messages to partition leaders. Brokers store partitions as append-only logs with leader-follower replication. KRaft (replacing ZooKeeper) manages cluster metadata and coordinates leader election. Consumer groups enable parallel consumption with automatic partition assignment.
Deep Dive
Topic-Partition Architecture
A topic is a logical channel for a category of messages. Each topic is divided into partitions — the fundamental unit of parallelism and ordering.
- Messages within a partition are strictly ordered by offset (monotonically increasing integer).
- Messages across partitions have no ordering guarantee.
- The partition count determines maximum consumer parallelism within a group: you can have at most as many active consumers as partitions.
Partition key routing: Producers optionally attach a key to each message. The broker hashes the key to determine the target partition: partition = hash(key) % numPartitions. Messages with the same key always land in the same partition, guaranteeing order for that key.
/// Core data models using Java 25 records.
record MessageId(int partition, long offset) {}
record Message(byte[] key, byte[] value, Map<String, String> headers, long timestamp) {}
record Partition(int id, String topic, List<Message> log, long latestOffset) {
Partition append(Message message) {
var newLog = new ArrayList<>(log);
newLog.add(message);
return new Partition(id, topic, List.copyOf(newLog), latestOffset + 1);
}
}
record Topic(String name, int partitionCount, int replicationFactor,
Map<Integer, Partition> partitions) {
int routeToPartition(byte[] key) {
if (key == null) {
return ThreadLocalRandom.current().nextInt(partitionCount); // Round-robin
}
return (Arrays.hashCode(key) & 0x7FFFFFFF) % partitionCount;
}
}
Records model the immutable nature of messages and partition state. A Partition produces a new instance on each append — this aligns with the append-only semantics where history is never mutated in place.
Append-Only Commit Log
The commit log is the heart of the message queue. Each partition is stored as a sequence of segment files on disk.
Write Path
- Producer sends message to partition leader.
- Leader appends message to the active segment file — a sequential disk write.
- Leader assigns the next offset to the message.
- Followers pull the message and append to their local logs.
- Once enough replicas acknowledge, the leader responds to the producer.
Sequential writes are the key performance insight. Random I/O on spinning disks yields ~100 operations/sec. Sequential writes achieve 600+ MB/sec. Even on SSDs, sequential access patterns benefit from write combining and reduced write amplification.
Segment Files and Indexing
The log for a single partition is split into segments:
partition-0/
00000000000000000000.log (messages offset 0-999)
00000000000000000000.index (sparse offset → position map)
00000000000000001000.log (messages offset 1000-1999)
00000000000000001000.index
00000000000000002000.log (active segment, currently being written)
Each segment file is named by its base offset. The index file maps selected offsets to byte positions in the log file, enabling binary search for offset-based lookups without scanning the entire file.
/// Simplified commit log with segment management.
class CommitLog {
private final Path partitionDir;
private final long maxSegmentBytes;
private LogSegment activeSegment;
CommitLog(Path partitionDir, long maxSegmentBytes) {
this.partitionDir = partitionDir;
this.maxSegmentBytes = maxSegmentBytes;
this.activeSegment = LogSegment.create(partitionDir, 0L);
}
synchronized long append(byte[] serializedMessage) throws IOException {
if (activeSegment.sizeBytes() + serializedMessage.length > maxSegmentBytes) {
activeSegment.close();
activeSegment = LogSegment.create(partitionDir, activeSegment.nextOffset());
}
return activeSegment.append(serializedMessage);
}
byte[] read(long offset) throws IOException {
LogSegment segment = findSegmentForOffset(offset);
long position = segment.lookupPosition(offset);
return segment.readAt(position);
}
private LogSegment findSegmentForOffset(long offset) {
// Binary search across segment base offsets
// Each segment file name encodes its base offset
return activeSegment; // Simplified — production scans segment list
}
}
class LogSegment {
private final FileChannel dataChannel;
private final SparseIndex index;
private final long baseOffset;
private long currentOffset;
private long currentPosition;
static LogSegment create(Path dir, long baseOffset) {
// Open data file and index file with memory-mapped access
// ...
return new LogSegment(/* channels */, baseOffset);
}
private LogSegment(FileChannel dataChannel, long baseOffset) {
this.dataChannel = dataChannel;
this.index = new SparseIndex();
this.baseOffset = baseOffset;
this.currentOffset = baseOffset;
this.currentPosition = 0;
}
long append(byte[] data) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(data);
long position = currentPosition;
dataChannel.write(buffer);
// Add to sparse index every 4096 bytes
if (currentPosition - index.lastIndexedPosition() >= 4096) {
index.addEntry(currentOffset, position);
}
currentPosition += data.length;
return currentOffset++;
}
long lookupPosition(long targetOffset) {
return index.floorEntry(targetOffset); // Closest offset ≤ target, then linear scan
}
long nextOffset() { return currentOffset; }
long sizeBytes() { return currentPosition; }
void close() throws IOException { dataChannel.close(); }
byte[] readAt(long position) throws IOException { /* read from channel */ return new byte[0]; }
}
The sparse index keeps memory footprint small. To find offset 1500, the system locates the index entry for offset 1024 (byte position X), then linearly scans from position X until offset 1500 — a short scan since entries are indexed every 4 KB.
Replication & Fault Tolerance
Each partition has one leader and N-1 followers (where N is the replication factor). All produce and consume requests go to the leader. Followers replicate by pulling from the leader.
In-Sync Replicas (ISR)
A follower is “in-sync” if it has replicated all messages up to the leader’s log end offset within a configurable lag threshold (time-based and offset-based). The ISR set can shrink and grow dynamically.
Write Acknowledgment Modes
| Mode | Behavior | Durability | Latency |
|---|---|---|---|
acks=0 | Producer doesn’t wait for any acknowledgment | Lowest — messages can be lost | Lowest |
acks=1 | Producer waits for leader acknowledgment | Medium — lost if leader crashes before replication | Low |
acks=all | Producer waits for all ISR members to acknowledge | Highest — zero loss as long as ISR > 0 | Highest |
Leader Election
When a broker hosting a partition leader fails:
- KRaft detects the broker heartbeat timeout.
- KRaft selects a new leader from the ISR set (preferring the most caught-up replica).
- KRaft updates the partition metadata and notifies all brokers.
- Producers and consumers discover the new leader through metadata refresh.
/// Replication manager coordinating ISR tracking and leader election.
record BrokerId(int id) {}
record PartitionReplica(BrokerId brokerId, long replicatedOffset) {}
class ReplicationManager {
private final Map<Integer, List<PartitionReplica>> partitionReplicas; // partitionId → replicas
private final Map<Integer, BrokerId> partitionLeaders; // partitionId → leader
private final long maxLagOffset;
ReplicationManager(long maxLagOffset) {
this.partitionReplicas = new ConcurrentHashMap<>();
this.partitionLeaders = new ConcurrentHashMap<>();
this.maxLagOffset = maxLagOffset;
}
Set<BrokerId> getISR(int partitionId, long leaderOffset) {
return partitionReplicas.getOrDefault(partitionId, List.of()).stream()
.filter(replica -> leaderOffset - replica.replicatedOffset() <= maxLagOffset)
.map(PartitionReplica::brokerId)
.collect(Collectors.toSet());
}
void electLeader(int partitionId, long leaderOffset) {
Set<BrokerId> isr = getISR(partitionId, leaderOffset);
if (isr.isEmpty()) {
throw new IllegalStateException(
"No in-sync replicas for partition " + partitionId + " — unclean election required"
);
}
// Select the replica with the highest replicated offset
BrokerId newLeader = partitionReplicas.get(partitionId).stream()
.filter(r -> isr.contains(r.brokerId()))
.max(Comparator.comparingLong(PartitionReplica::replicatedOffset))
.map(PartitionReplica::brokerId)
.orElseThrow();
partitionLeaders.put(partitionId, newLeader);
}
CompletableFuture<Void> replicateToFollowers(int partitionId, byte[] message) {
List<PartitionReplica> followers = partitionReplicas.getOrDefault(partitionId, List.of())
.stream()
.filter(r -> !r.brokerId().equals(partitionLeaders.get(partitionId)))
.toList();
List<CompletableFuture<Void>> futures = followers.stream()
.map(follower -> CompletableFuture.runAsync(() -> {
// Send message to follower broker via RPC
sendReplicaAppend(follower.brokerId(), partitionId, message);
}, Thread.ofVirtual().factory()))
.toList();
return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
private void sendReplicaAppend(BrokerId broker, int partitionId, byte[] message) {
// Network RPC to follower broker — implementation omitted
}
}
Virtual threads power the replication RPCs. Each follower receives its replication request on a dedicated virtual thread, allowing thousands of concurrent partition replications without exhausting the OS thread pool.
Consumer Groups & Offset Management
A consumer group enables parallel processing of a topic. The key invariant: each partition is consumed by exactly one consumer in a group at any point in time.
Partition Assignment
With 6 partitions and 3 consumers in a group:
- Consumer 1 → Partitions 0, 1
- Consumer 2 → Partitions 2, 3
- Consumer 3 → Partitions 4, 5
When Consumer 3 crashes, rebalancing occurs:
- Consumer 1 → Partitions 0, 1, 4
- Consumer 2 → Partitions 2, 3, 5
You cannot have more active consumers than partitions — excess consumers sit idle as standby.
Offset Tracking
Each consumer tracks its position in each assigned partition via a committed offset. The committed offset represents the last message the consumer has fully processed.
/// Offset tracker managing committed positions per consumer group.
record ConsumerPosition(String groupId, int partitionId, long committedOffset, long currentOffset) {}
class OffsetTracker {
// groupId:partitionId → committed offset
private final Map<String, Long> committedOffsets = new ConcurrentHashMap<>();
void commit(String groupId, int partitionId, long offset) {
String key = groupId + ":" + partitionId;
committedOffsets.put(key, offset);
// Durably persist to an internal __consumer_offsets topic
persistOffset(key, offset);
}
long getCommittedOffset(String groupId, int partitionId) {
String key = groupId + ":" + partitionId;
return committedOffsets.getOrDefault(key, 0L);
}
long computeLag(String groupId, int partitionId, long logEndOffset) {
long committed = getCommittedOffset(groupId, partitionId);
return logEndOffset - committed;
}
void resetToEarliest(String groupId, int partitionId) {
commit(groupId, partitionId, 0L);
}
void resetToTimestamp(String groupId, int partitionId, long timestamp) {
// Binary search the segment index by timestamp to find the corresponding offset
long offset = lookupOffsetByTimestamp(partitionId, timestamp);
commit(groupId, partitionId, offset);
}
private void persistOffset(String key, long offset) {
// Write to __consumer_offsets compacted topic
}
private long lookupOffsetByTimestamp(int partitionId, long timestamp) {
return 0L; // Simplified — uses timestamp index in production
}
}
Consumer lag (the gap between the committed offset and the log end offset) is a critical operational metric. Growing lag means consumers cannot keep up with the production rate — the fix is adding more consumers (up to the partition count) or increasing partition count.
Message Delivery Guarantees
Three semantics exist, each with different trade-offs:
At-most-once: Consumer commits offset before processing. If processing fails, the message is skipped. Fast but risks data loss.
At-least-once: Consumer commits offset after processing. If the consumer crashes mid-processing, the message is re-delivered on recovery. No data loss but potential duplicates.
Exactly-once: Combines idempotent producers and transactional consumers:
- Idempotent producers: Each producer gets a unique ID. Each message carries a sequence number. The broker deduplicates based on (producerId, sequenceNumber) — retried sends with the same sequence are silently dropped.
- Transactional consumers: Consumer processing and offset commitment happen within a transaction. Either both succeed or both roll back.
Dead Letter Queue
Messages that repeatedly fail processing (poison messages) get routed to a dead letter queue (DLQ) after a configurable retry count. This prevents a single bad message from blocking the entire partition. Engineers investigate and reprocess DLQ messages manually or through automated remediation.
Compaction
Standard retention deletes entire segments older than the retention period. Log compaction offers an alternative: retain the latest value for each message key, discarding older entries.
Use case: A changelog topic where each key represents an entity and each message is the entity’s latest state. After compaction, the topic contains exactly one entry per key — a materialized snapshot of all entity states.
Compaction runs as a background process:
- Read old segments.
- Build a map of key → latest offset.
- Write a new cleaned segment containing only the latest entry per key.
- Replace old segments atomically.
Bottlenecks & Scaling
| Bottleneck | Mitigation |
|---|---|
| Hot partitions | Monitor per-partition throughput; split hot partitions or improve key distribution |
| Consumer lag | Add consumers (up to partition count); increase partition count for more parallelism |
| Disk I/O | Use SSDs for latency-sensitive workloads; leverage OS page cache for sequential reads; enable zero-copy transfer (sendfile syscall) |
| Network saturation | Enable compression (LZ4 for speed, Zstd for ratio); batch messages at the producer |
| Broker failure | ISR-based leader election with automatic failover; rack-aware replica placement |
| Rebalancing storms | Use sticky assignor to minimize partition movement; incremental cooperative rebalancing |
| Metadata bottleneck | KRaft mode eliminates ZooKeeper dependency; partition metadata cached locally on each broker |
Zero-copy transfer deserves emphasis. Without it, data flows: disk → kernel buffer → user space → socket buffer → network. With sendfile(), data goes directly: disk → kernel buffer → network. This eliminates two memory copies and two context switches per message fetch, delivering up to 10x improvement in consumer throughput.
Interviewer Tips
- Start with the commit log abstraction: The append-only log is the foundational data structure. Demonstrate you understand why sequential I/O dominates random I/O and how this shapes the entire architecture.
- Partition count is a design decision, not an afterthought: Discuss how partition count affects parallelism, ordering, and rebalancing. A common mistake is suggesting “add more partitions” without acknowledging the ordering trade-offs.
- Distinguish ordering scopes clearly: Messages are ordered within a partition, not across partitions. The partition key is the mechanism for grouping related messages that need ordering.
- Quantify replication costs: With replication factor 3 and
acks=all, every produce request triggers 2 additional network round trips and 2 additional disk writes. Calculate the throughput impact. - Common follow-ups:
- “How do you handle broker decommissioning?” → Controlled partition reassignment: replicate partitions to new brokers before shutting down the old one.
- “What happens if all ISR members go down?” → Unclean leader election: either wait for an ISR member to recover (prioritize durability) or elect any available replica (prioritize availability). This is a configurable policy.
- “How do you handle schema evolution?” → Schema registry with compatibility checks (backward, forward, full). Producers register schemas; consumers validate against registered schemas.
- “How would you implement delayed/scheduled messages?” → Separate delay topic with a timer wheel; transfer messages to the target topic when the delay expires.
- “How do you achieve exactly-once across services?” → Transactional outbox pattern: write to database and outbox table in a local transaction; a connector tails the outbox and produces to the message queue.