Backpressure, Consumer Lag, and Dead Letter Queues
Backpressure, Consumer Lag, and Dead Letter Queues
The Symptom
The driver location pipeline processes location updates from every active driver in real time. During Friday evening surge: 22,000 active drivers, each emitting GPS coordinates every 2 seconds. That is 11,000 events per second at baseline. During a city-wide concert event with surge pricing across 40 zones: 50,000 events per second. The three consumer instances that handled baseline traffic fell behind. Consumer lag hit 2.4 million messages. The driver map in the rider app showed positions that were 4 minutes stale. Riders requested pickups from drivers who had already moved 6 blocks.
The on-call engineer scaled consumers from 3 to 6. Lag kept growing. Scaled to 9. Lag stabilized but did not decrease. The partition count was 12, and 9 consumers with concurrency 1 meant 9 active threads. Three partitions had 2x the traffic (airport, downtown, stadium zones). Those three partitions drove all the lag.
The Cause
Three problems compounded:
Problem 1: Consumer lag. The consumers process each location update by writing to Redis (for the real-time driver map) and to TimescaleDB (for historical analytics). The Redis write takes 2ms. The TimescaleDB write takes 18ms. At 50,000 events/second across 12 partitions, each partition receives ~4,167 events/second. Each event takes 20ms to process. One consumer thread processes 50 events/second. A single consumer cannot keep up with a single partition.
Problem 2: Uneven partition distribution. The partition key is driverId. Drivers cluster geographically. Airport drivers, downtown drivers, and stadium drivers each dominate a few partitions. Three partitions handle 35% of total traffic. The fastest 9 partitions finish their batches while the slowest 3 partitions accumulate lag.
Problem 3: No backpressure signal. The producer publishes at line rate. When consumers fall behind, the producer has no feedback mechanism. Events accumulate in Kafka until the retention period expires (7 days). The system “works” in the sense that nothing crashes. But the data is stale, which is worse than crashing because nobody gets paged.
The Baseline
# load-tests/driver_location_locustfile.py
from locust import HttpUser, task, between, LoadTestShape
import random
import time
class DriverLocationUser(HttpUser):
wait_time = between(1.5, 2.5) # Simulates GPS interval
def on_start(self):
self.driver_id = f"driver-{random.randint(1, 50000)}"
self.lat = 40.7128 + random.uniform(-0.1, 0.1)
self.lng = -74.0060 + random.uniform(-0.1, 0.1)
@task
def send_location(self):
# Simulate GPS drift
self.lat += random.uniform(-0.001, 0.001)
self.lng += random.uniform(-0.001, 0.001)
self.client.post(
"/api/drivers/location",
json={
"driverId": self.driver_id,
"latitude": round(self.lat, 6),
"longitude": round(self.lng, 6),
"heading": random.randint(0, 360),
"speed": round(random.uniform(0, 65), 1),
"timestamp": int(time.time() * 1000)
},
name="/api/drivers/location"
)
class DriverLocationShape(LoadTestShape):
stages = [
{"duration": 60, "users": 5000, "spawn_rate": 500},
{"duration": 120, "users": 15000, "spawn_rate": 1000},
{"duration": 180, "users": 30000, "spawn_rate": 1000},
{"duration": 240, "users": 50000, "spawn_rate": 2000},
{"duration": 360, "users": 50000, "spawn_rate": 0},
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
return (stage["users"], stage["spawn_rate"])
return None
3 consumer instances, 12 partitions, each consumer with concurrency=1 (3 active consumer threads):
| Time (s) | Events/s | Consumer Lag | Lag Growth Rate | p99 Staleness |
|---|---|---|---|---|
| 60 | 5,000 | 0 | 0 | <1s |
| 120 | 15,000 | 180,000 | 3,000/s | 12s |
| 180 | 30,000 | 1,080,000 | 15,000/s | 72s |
| 240 | 50,000 | 2,400,000 | 22,000/s | 240s |
| 360 | 50,000 | 5,040,000 | 22,000/s | 420s |
At 50,000 events/second, the 3 consumers process ~150 events/second total. Lag grows at 22,000/second. After 6 minutes at peak rate, the driver map is 7 minutes behind reality.
The Fix
Step 1: Measure Consumer Lag with Prometheus
Before fixing anything, instrument the lag. If you cannot measure it, you cannot alert on it, and you will not know when the fix works.
// SCALED: Consumer lag gauge exposed to Prometheus
@Component
public class ConsumerLagMonitor {
private final AdminClient adminClient;
private final MeterRegistry meterRegistry;
@Scheduled(fixedRate = 5000)
public void recordConsumerLag() {
Map<TopicPartition, OffsetAndMetadata> committed =
adminClient.listConsumerGroupOffsets("driver-location-tracker")
.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
committed.keySet().stream().collect(Collectors.toMap(
tp -> tp,
tp -> OffsetSpec.latest()
))
).all().get(10, TimeUnit.SECONDS);
committed.forEach((tp, offsetMeta) -> {
long lag = endOffsets.get(tp).offset() - offsetMeta.offset();
Gauge.builder("kafka.consumer.lag")
.tag("topic", tp.topic())
.tag("partition", String.valueOf(tp.partition()))
.tag("group", "driver-location-tracker")
.register(meterRegistry)
.set(lag);
});
}
}
# prometheus/alerts.yml
- alert: KafkaConsumerLagCritical
expr: sum(kafka_consumer_lag{group="driver-location-tracker"}) > 100000
for: 60s
labels:
severity: critical
annotations:
summary: "Driver location consumer lag is {{ $value }} messages"
description: "Driver map staleness will exceed 30s. Scale consumers."
Step 2: Increase Partition Count
12 partitions cap parallelism at 12 consumer threads. At 50,000 events/second with 20ms processing per event, you need at least 1,000 consumer threads. That is impractical. The fix is twofold: increase partitions and reduce per-event processing time.
First, separate the processing. Redis write (2ms) and TimescaleDB write (18ms) do not need to happen in the same consumer. Split them into two consumer groups.
// SCALED: Redis consumer - fast path for real-time driver map
@Component
public class DriverLocationRedisConsumer {
private final RedisTemplate<String, DriverLocation> redisTemplate;
@KafkaListener(
topics = "driver-location-updates",
groupId = "driver-location-redis",
concurrency = "12"
)
public void onLocationUpdate(@Payload DriverLocationEvent event) {
// SCALED: 2ms per event - Redis SETEX with 30s TTL
redisTemplate.opsForValue().set(
"driver:loc:" + event.driverId(),
new DriverLocation(event.latitude(), event.longitude(),
event.heading(), event.speed()),
Duration.ofSeconds(30)
);
}
}
// SCALED: TimescaleDB consumer - slow path for analytics, can lag
@Component
public class DriverLocationAnalyticsConsumer {
private final JdbcTemplate jdbcTemplate;
@KafkaListener(
topics = "driver-location-updates",
groupId = "driver-location-analytics",
concurrency = "6",
properties = {
"max.poll.records=500",
"fetch.min.bytes=50000"
}
)
public void onLocationUpdate(
@Payload List<DriverLocationEvent> events) {
// SCALED: Batch insert - 500 rows in one statement
jdbcTemplate.batchUpdate(
"INSERT INTO driver_locations (driver_id, lat, lng, heading, speed, ts) VALUES (?, ?, ?, ?, ?, ?)",
events.stream().map(e -> new Object[]{
e.driverId(), e.latitude(), e.longitude(),
e.heading(), e.speed(), Timestamp.from(e.timestamp())
}).toList()
);
}
}
The Redis consumer processes each event in 2ms. With 12 partitions and 12 consumer threads, that is 6,000 events/second. Still short of 50,000. Scale partitions.
# Scale from 12 to 48 partitions
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--alter --topic driver-location-updates \
--partitions 48
48 partitions with 12 consumer threads: each thread handles 4 partitions. At 2ms per event, each thread processes 500 events/second. 12 threads process 6,000/second. Still short. Scale the consumer deployment.
Step 3: Scale Consumer Instances
# k8s/driver-location-redis-consumer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: driver-location-redis-consumer
spec:
replicas: 4
selector:
matchLabels:
app: driver-location-redis-consumer
template:
metadata:
labels:
app: driver-location-redis-consumer
spec:
containers:
- name: consumer
image: ridehail/driver-location-consumer:latest
env:
- name: SPRING_KAFKA_LISTENER_CONCURRENCY
value: "12"
resources:
requests:
cpu: "1000m"
memory: "768Mi"
limits:
cpu: "2000m"
memory: "1536Mi"
4 replicas, each with concurrency 12: 48 consumer threads. 48 threads for 48 partitions. Each thread handles one partition. At ~1,042 events/partition/second and 2ms per event, each thread processes 500 events/second with headroom. But 48 threads at 500/second is only 24,000/second. Half of peak.
The remaining gap is closed by batch processing. Instead of one-at-a-time, poll 200 records per batch:
// SCALED: Batch processing with pipelined Redis writes
@KafkaListener(
topics = "driver-location-updates",
groupId = "driver-location-redis",
concurrency = "12",
batch = "true",
properties = {
"max.poll.records=200",
"fetch.min.bytes=10000"
}
)
public void onLocationBatch(@Payload List<DriverLocationEvent> events) {
// SCALED: Pipeline 200 Redis writes in one round-trip
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (DriverLocationEvent event : events) {
byte[] key = ("driver:loc:" + event.driverId()).getBytes();
byte[] value = serialize(event);
connection.stringCommands().setEx(key, 30, value);
}
return null;
});
}
200 pipelined Redis writes complete in ~4ms (one network round-trip instead of 200). One consumer thread processes 200 events in 4ms. That is 50,000 events/second per thread. With 48 threads, capacity is 2.4 million events/second. The bottleneck moves from consumers to the Kafka broker network.
Step 4: Dead Letter Topics for Poisoned Events
A malformed driver location event with latitude: null caused a NullPointerException. The consumer retried 3 times, failed 3 times, and blocked the partition. All subsequent events on that partition stalled. 4% of drivers disappeared from the map.
// SCALED: DLT routing for events that fail after retries
@Configuration
public class DriverLocationConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DriverLocationEvent>
locationListenerFactory(
ConsumerFactory<String, DriverLocationEvent> cf,
KafkaTemplate<String, Object> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, DriverLocationEvent>();
factory.setConsumerFactory(cf);
factory.setBatchListener(true);
// SCALED: After 3 retries, send to DLT and move on
var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLT",
record.partition()
)
);
var errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(500L, 3));
// SCALED: Skip deserialization errors immediately - no retry
errorHandler.addNotRetryableExceptions(
DeserializationException.class,
ClassCastException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
The DeadLetterPublishingRecoverer publishes failed records to driver-location-updates.DLT. The original event, the exception stack trace, and the original topic/partition/offset are preserved in headers. An operations team reviews DLT events daily.
For poison pills (events that fail on every attempt regardless of retry), the addNotRetryableExceptions call skips retries entirely. A DeserializationException means the bytes cannot become a DriverLocationEvent. Retrying will not fix corrupted bytes. Send it to the DLT immediately and continue processing the next record.
// SCALED: DLT consumer for monitoring and alerting
@Component
public class DriverLocationDltConsumer {
private final MeterRegistry meterRegistry;
private static final Logger log = LoggerFactory.getLogger(DriverLocationDltConsumer.class);
@KafkaListener(
topics = "driver-location-updates.DLT",
groupId = "driver-location-dlt-monitor"
)
public void onDeadLetter(
ConsumerRecord<String, byte[]> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset) {
meterRegistry.counter("kafka.dlt.events",
"topic", originalTopic).increment();
log.error("DLT event from {} offset {}: {}",
originalTopic, originalOffset, errorMessage);
}
}
# prometheus/alerts.yml
- alert: KafkaDLTEventsHigh
expr: rate(kafka_dlt_events_total{topic="driver-location-updates"}[5m]) > 10
for: 30s
labels:
severity: warning
annotations:
summary: "Driver location DLT receiving {{ $value }} events/s"
description: "Possible poison pill or schema mismatch in producer."
Step 5: Backpressure via Consumer Pause/Resume
When the Redis consumer detects that processing is falling behind (lag exceeds a threshold), it can pause consumption on specific partitions. This gives overloaded partitions time to drain while other partitions continue processing.
// SCALED: Pause overloaded partitions based on lag
@Component
public class BackpressureManager implements ConsumerAwareRebalanceListener {
private final AtomicBoolean paused = new AtomicBoolean(false);
private static final long LAG_PAUSE_THRESHOLD = 50_000;
private static final long LAG_RESUME_THRESHOLD = 10_000;
@Scheduled(fixedRate = 5000)
public void evaluateBackpressure(
@Autowired KafkaListenerEndpointRegistry registry) {
var container = registry.getListenerContainer("driver-location-redis");
if (container == null) return;
long totalLag = getTotalLag();
if (totalLag > LAG_PAUSE_THRESHOLD && !paused.get()) {
container.pause();
paused.set(true);
log.warn("Paused driver-location-redis consumer. Lag: {}", totalLag);
} else if (totalLag < LAG_RESUME_THRESHOLD && paused.get()) {
container.resume();
paused.set(false);
log.info("Resumed driver-location-redis consumer. Lag: {}", totalLag);
}
}
}
Pausing is a last resort. The primary defense is scaling consumers and partitions. But during cascading failures (Redis latency spike, network partition), pausing prevents the consumer from accumulating a processing backlog that takes minutes to drain after the failure resolves.
The Proof
Same Locust test, now with 48 partitions, 4 consumer replicas (12 threads each = 48 threads), batch processing with pipelined Redis writes, and DLT for poison pills:
3 Consumers (Original)
| Time (s) | Events/s | Consumer Lag | Lag Growth Rate | p99 Staleness |
|---|---|---|---|---|
| 60 | 5,000 | 0 | 0 | <1s |
| 120 | 15,000 | 180,000 | 3,000/s | 12s |
| 180 | 30,000 | 1,080,000 | 15,000/s | 72s |
| 240 | 50,000 | 2,400,000 | 22,000/s | 240s |
12 Consumers (48 Partitions, Batch + Pipeline)
| Time (s) | Events/s | Consumer Lag | Lag Growth Rate | p99 Staleness |
|---|---|---|---|---|
| 60 | 5,000 | 0 | 0 | <1s |
| 120 | 15,000 | 0 | 0 | <1s |
| 180 | 30,000 | 0 | 0 | <1s |
| 240 | 50,000 | 850 | ~0 | <2s |
| 360 | 50,000 | 1,200 | ~0 | <2s |
| Metric | 3 Consumers (12 parts) | 12 Consumers (48 parts) | Improvement |
|---|---|---|---|
| Max events/s at zero lag | ~5,000 | 50,000+ | 10x |
| Consumer lag at 50k events/s | 2,400,000 | 1,200 | 2,000x reduction |
| p99 staleness at 50k events/s | 240s | <2s | 120x |
| DLT events captured | 0 (blocked partition) | 47 (routed to DLT) | No stuck partitions |
| Recovery time after spike | 45 minutes | <10s | 270x |
The driver map now shows positions that are at most 2 seconds old during the worst surge. The 47 DLT events in the test were intentionally malformed records. Without the DLT, those 47 events would have blocked 47 partitions. With the DLT, they were routed, logged, and the consumer moved on in under 1ms per poison pill.
The cost: 4 consumer pods at 1 CPU / 768Mi each. Total: 4 vCPUs and 3Gi memory for the driver location Redis pipeline. The alternative was 240 seconds of driver map staleness, which translates to riders requesting pickups from phantom drivers, generating 3x support tickets during surge events.