Skip to main content
search at depth

Change Data Capture: Keeping OpenSearch in Sync with Kafka

4 min read Chapter 52 of 60

Change Data Capture: Keeping OpenSearch in Sync with Kafka

The documentation platform stores documents in PostgreSQL as the system of record. OpenSearch is a derived view optimized for search. The two must stay synchronized. The initial implementation uses a dual-write pattern: the application writes to PostgreSQL and then to OpenSearch in the same request. When OpenSearch is temporarily unavailable, documents are saved to PostgreSQL but never indexed. The search index silently drifts from the database.

Why CDC Over Dual-Write

Dual-write has three failure modes:

  1. Partial failure. The PostgreSQL write succeeds but the OpenSearch write fails. The document exists in the database but is not searchable.
  2. Ordering violation. Two concurrent updates to the same document arrive at OpenSearch in reverse order. The older version overwrites the newer one.
  3. Performance coupling. OpenSearch latency directly affects application response time. A slow bulk index degrades the API that saves documents.

Change Data Capture (CDC) eliminates all three by reading the database’s write-ahead log (WAL) and publishing change events to Kafka. A separate consumer reads from Kafka and indexes into OpenSearch. The database write and the OpenSearch index are decoupled.

PostgreSQL WAL → Debezium → Kafka Topic → Consumer → OpenSearch

The Implementation

Kafka Consumer for OpenSearch Indexing

@Component
public class DocumentChangeConsumer {

    private final OpenSearchClient openSearchClient;
    private final DocumentTransformer transformer;
    private static final int BATCH_SIZE = 100;

    @KafkaListener(
        topics = "cdc.public.documents",
        groupId = "opensearch-indexer",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void consume(List<ConsumerRecord<String, String>> records,
            Acknowledgment ack) {

        try {
            BulkRequest.Builder bulk = new BulkRequest.Builder()
                .refresh(Refresh.False);

            for (ConsumerRecord<String, String> record : records) {
                CdcEvent event = parseEvent(record.value());

                switch (event.operation()) {
                    case "c", "r", "u" -> {
                        // Create, Read (snapshot), Update → Index
                        DocPage doc = transformer.transform(event.after());
                        bulk.operations(op -> op
                            .index(idx -> idx
                                .index("docs-v1")
                                .id(doc.slug())
                                .routing(doc.tenantId())
                                .document(doc)
                            )
                        );
                    }
                    case "d" -> {
                        // Delete → Remove from index
                        String slug = event.before().get("slug").asText();
                        String tenantId = event.before().get("tenant_id").asText();
                        bulk.operations(op -> op
                            .delete(d -> d
                                .index("docs-v1")
                                .id(slug)
                                .routing(tenantId)
                            )
                        );
                    }
                }
            }

            BulkResponse response = openSearchClient.bulk(bulk.build());

            if (response.errors()) {
                handleBulkErrors(response, records);
            }

            // Commit offset only after successful indexing
            ack.acknowledge();

        } catch (Exception e) {
            // Do not acknowledge → records will be redelivered
            throw new RuntimeException("Failed to process CDC batch", e);
        }
    }

    private record CdcEvent(
        String operation,  // c=create, u=update, d=delete, r=read(snapshot)
        JsonNode before,
        JsonNode after
    ) {}
}

Idempotent Indexing

// HARDENED: Idempotent indexing using document ID as the natural key
// Kafka may redeliver messages (at-least-once). Using the document slug
// as the OpenSearch document ID means redelivered messages overwrite
// the same document with identical content—no duplicates.

bulk.operations(op -> op
    .index(idx -> idx
        .index("docs-v1")
        .id(doc.slug())              // Natural key → idempotent
        .routing(doc.tenantId())
        .document(doc)
    )
);
// FRAGILE: Auto-generated IDs create duplicates on redelivery
// Each redelivered message creates a new document with a random ID.

bulk.operations(op -> op
    .index(idx -> idx
        .index("docs-v1")
        // No .id() → auto-generated UUID → duplicate on redelivery
        .document(doc)
    )
);

Handling Out-of-Order Events

// HARDENED: Use external versioning to reject stale updates
// The CDC event includes the database's update timestamp.
// OpenSearch rejects index operations with a version lower than
// the current document version.

bulk.operations(op -> op
    .index(idx -> idx
        .index("docs-v1")
        .id(doc.slug())
        .routing(doc.tenantId())
        .versionType(VersionType.External)
        .version(doc.updatedAtEpochMs())  // Database timestamp as version
        .document(doc)
    )
);

External versioning ensures that if events arrive out of order (update B before update A, where B is newer), the older event A is rejected with a version conflict. The index always reflects the latest database state.

CDC pipeline architecture showing PostgreSQL WAL, Debezium, Kafka topic, consumer group, and OpenSearch indexing

Consumer Lag Monitoring

@Scheduled(fixedRate = 60_000)
public void monitorConsumerLag() {
    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(
        consumer.assignment());
    Map<TopicPartition, OffsetAndMetadata> committed =
        consumer.committed(consumer.assignment());

    for (var partition : endOffsets.keySet()) {
        long endOffset = endOffsets.get(partition);
        long committedOffset = committed.containsKey(partition)
            ? committed.get(partition).offset()
            : 0;
        long lag = endOffset - committedOffset;

        emitMetric("cdc.consumer.lag", lag, partition.toString());

        if (lag > 10_000) {
            emitAlert("CDC consumer lag exceeded 10,000 for partition "
                + partition);
        }
    }
}

The Decision Rule

Use CDC over dual-write for any system where OpenSearch is a derived view of a relational database. The operational complexity of CDC (Kafka, Debezium, consumer) is lower than the data consistency bugs of dual-write.

Always use explicit document IDs derived from the natural key (slug, primary key) to achieve idempotent indexing. At-least-once delivery from Kafka means messages will be redelivered after consumer failures. Idempotent indexing makes redelivery harmless.

Monitor consumer lag as the primary health metric for the CDC pipeline. Lag above 10,000 events means the search index is meaningfully behind the database. Lag above 100,000 means users are searching stale data.