Skip to main content
search at depth

Dead Letter Queues and Poison Message Handling

4 min read Chapter 54 of 60

Dead Letter Queues and Poison Message Handling

The Symptom

A document with a 15MB body field arrives in the CDC topic. The OpenSearch bulk index rejects it because the HTTP request exceeds the http.max_content_length setting. The consumer retries. And retries. The consumer is stuck on this single message, unable to process the 10,000 events behind it. The search index falls behind the database while the consumer retries a message that can never succeed.

The Internals

A poison message is a CDC event that can never be successfully processed, regardless of how many times it is retried. Common causes:

  • Document exceeds OpenSearch’s maximum field length or HTTP content length
  • Document contains invalid UTF-8 sequences that break the JSON serializer
  • Document references a tenant whose index does not exist and cannot be auto-created
  • Mapping conflict: the event contains a field value incompatible with the existing mapping (e.g., string value for an integer field)

A dead letter queue (DLQ) isolates poison messages from the main pipeline. Failed messages are moved to a separate topic after a configurable number of retries. The main pipeline continues processing the remaining events.

The Implementation

DLQ-Aware Consumer

@Component
public class ResilientDocumentConsumer {

    private final OpenSearchClient client;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final DocumentTransformer transformer;
    private static final int MAX_RETRIES = 3;
    private static final String DLQ_TOPIC = "cdc.public.documents.dlq";

    @KafkaListener(
        topics = "cdc.public.documents",
        groupId = "opensearch-indexer",
        containerFactory = "batchListenerFactory"
    )
    public void consume(List<ConsumerRecord<String, String>> records,
            Acknowledgment ack) {

        List<ConsumerRecord<String, String>> retryable = new ArrayList<>();

        // First pass: attempt to index all records
        BulkRequest.Builder bulk = new BulkRequest.Builder()
            .refresh(Refresh.False);

        for (var record : records) {
            try {
                CdcEvent event = parseEvent(record.value());
                addToBulk(bulk, event);
            } catch (Exception e) {
                // Parse failure → immediate DLQ (no retry will fix bad JSON)
                sendToDlq(record, e.getMessage());
            }
        }

        try {
            BulkResponse response = client.bulk(bulk.build());

            if (response.errors()) {
                for (int i = 0; i < response.items().size(); i++) {
                    var item = response.items().get(i);
                    if (item.error() != null) {
                        var record = records.get(i);
                        String errorType = item.error().type();

                        if (isRetryable(errorType)) {
                            retryable.add(record);
                        } else {
                            sendToDlq(record, item.error().reason());
                        }
                    }
                }
            }
        } catch (IOException e) {
            // Cluster-level failure → do not acknowledge, redeliver all
            throw new RuntimeException("Bulk index failed", e);
        }

        // Retry retryable failures with exponential backoff
        for (var record : retryable) {
            retryWithBackoff(record);
        }

        ack.acknowledge();
    }

    private boolean isRetryable(String errorType) {
        return errorType.contains("rejected_execution") ||
               errorType.contains("unavailable_shards") ||
               errorType.contains("timeout");
    }

    private void sendToDlq(ConsumerRecord<String, String> record,
            String reason) {
        var headers = new RecordHeaders(record.headers());
        headers.add("dlq.reason", reason.getBytes(StandardCharsets.UTF_8));
        headers.add("dlq.timestamp",
            Instant.now().toString().getBytes(StandardCharsets.UTF_8));
        headers.add("dlq.original.topic",
            record.topic().getBytes(StandardCharsets.UTF_8));

        kafkaTemplate.send(DLQ_TOPIC, record.key(), record.value());
    }

    private void retryWithBackoff(ConsumerRecord<String, String> record) {
        int retryCount = getRetryCount(record);

        if (retryCount >= MAX_RETRIES) {
            sendToDlq(record, "Exceeded max retries (" + MAX_RETRIES + ")");
            return;
        }

        try {
            Thread.sleep((long) Math.pow(2, retryCount) * 1000);
            CdcEvent event = parseEvent(record.value());
            indexSingleDocument(event);
        } catch (Exception e) {
            sendToDlq(record, e.getMessage());
        }
    }
}

DLQ Monitoring and Recovery

public class DlqMonitor {

    private final KafkaConsumer<String, String> dlqConsumer;

    public record DlqStats(
        long totalMessages,
        Map<String, Long> reasonCounts,
        Instant oldestMessage,
        Instant newestMessage
    ) {}

    public DlqStats getDlqStats() {
        var partitions = dlqConsumer.partitionsFor(DLQ_TOPIC);
        long total = 0;
        Map<String, Long> reasons = new LinkedHashMap<>();

        for (var partition : partitions) {
            TopicPartition tp = new TopicPartition(DLQ_TOPIC, partition.partition());
            dlqConsumer.assign(List.of(tp));
            dlqConsumer.seekToBeginning(List.of(tp));

            ConsumerRecords<String, String> records =
                dlqConsumer.poll(Duration.ofSeconds(5));

            for (var record : records) {
                total++;
                String reason = new String(
                    record.headers().lastHeader("dlq.reason").value(),
                    StandardCharsets.UTF_8);
                reasons.merge(reason, 1L, Long::sum);
            }
        }

        return new DlqStats(total, reasons, null, null);
    }
}

The Measurement

DLQ impact on pipeline throughput:

ScenarioWithout DLQWith DLQ
Poison message (1 in 10,000)Pipeline stuck indefinitely3s delay, continues
Transient failure (5% rate)5% events lost or stuckRetried and indexed
Mapping conflict batchEntire batch rejected1 event to DLQ, 999 indexed
DLQ backlog after 30 daysN/A~50 messages (manual review)

The Decision Rule

Route permanently failing messages to a DLQ after 3 retry attempts. A message that fails 3 times with the same error is unlikely to succeed on the 4th attempt. The DLQ preserves the message for manual investigation without blocking the pipeline.

Classify errors as retryable (cluster overload, timeout, unavailable shards) or permanent (mapping conflict, oversized document, malformed data). Retryable errors get exponential backoff. Permanent errors go directly to the DLQ.

Monitor DLQ depth daily. A growing DLQ indicates a systemic problem (mapping mismatch, schema evolution error) rather than isolated poison messages. DLQ depth should be stable or decreasing after manual remediation.