Skip to main content
search at depth

Schema Evolution and Consumer Compatibility

4 min read Chapter 53 of 60

Schema Evolution and Consumer Compatibility

The Symptom

The database team adds a content_rating column to the documents table. Debezium picks up the schema change and publishes events with the new field. The OpenSearch consumer deserializes the event, encounters an unknown field, and throws a JsonMappingException. The consumer stops processing. 50,000 CDC events accumulate in Kafka. The search index falls behind the database by 8 hours before anyone notices.

The Internals

Schema evolution in a CDC pipeline touches three independent schemas:

  1. Database schema. The PostgreSQL table definition. Changes via ALTER TABLE.
  2. CDC event schema. The Kafka message schema (Avro, JSON Schema, or JSON). Changes when the database schema changes.
  3. OpenSearch mapping. The index mapping. Must accommodate new fields from the CDC events.

Each schema evolves independently. A column added to PostgreSQL propagates through the CDC event schema and must be handled by the consumer before it can be mapped in OpenSearch. If any layer rejects the change, the pipeline breaks.

The Implementation

Resilient Event Deserialization

// HARDENED: Lenient deserialization that ignores unknown fields
// New fields in the CDC event do not crash the consumer.

@JsonIgnoreProperties(ignoreUnknown = true)
public record DocumentEvent(
    String slug,
    String tenantId,
    String title,
    String body,
    String contentType,
    String version,
    Instant updatedAt,
    // New fields are added here with @JsonProperty and default values
    @JsonProperty("content_rating")
    @JsonSetter(nulls = Nulls.SKIP)
    String contentRating
) {
    public DocumentEvent {
        if (contentRating == null) {
            contentRating = "unrated";
        }
    }
}
// FRAGILE: Strict deserialization that fails on unknown fields.
// Any new column in the database crashes the consumer.

// ObjectMapper with DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES = true

Mapping Evolution Strategy

// HARDENED: Check for new fields and update the mapping before indexing

public class SchemaEvolutionHandler {

    private final OpenSearchClient client;
    private final Set<String> knownFields = ConcurrentHashMap.newKeySet();

    public SchemaEvolutionHandler(OpenSearchClient client) {
        this.client = client;
    }

    public void ensureFieldMapped(String indexName, String fieldName,
            String fieldType) throws IOException {

        if (knownFields.contains(fieldName)) {
            return;
        }

        // Check if field already exists in the mapping
        var mapping = client.indices().getMapping(gm -> gm.index(indexName));
        var properties = mapping.get(indexName).mappings().properties();

        if (!properties.containsKey(fieldName)) {
            // Add the new field to the mapping
            client.indices().putMapping(pm -> pm
                .index(indexName)
                .properties(fieldName, p -> {
                    return switch (fieldType) {
                        case "keyword" -> p.keyword(k -> k);
                        case "text" -> p.text(t -> t);
                        case "integer" -> p.integer(i -> i);
                        case "date" -> p.date(d -> d);
                        default -> p.keyword(k -> k);
                    };
                })
            );
        }

        knownFields.add(fieldName);
    }
}

Consumer Version Compatibility Matrix

// Version compatibility: consumer handles events from multiple
// database schema versions simultaneously during rolling deployments

public class DocumentTransformer {

    public DocPage transform(JsonNode after) {
        // V1 fields (always present)
        String slug = after.get("slug").asText();
        String tenantId = after.get("tenant_id").asText();
        String title = after.get("title").asText();
        String body = after.get("body").asText();

        // V2 field (added 2024-03, may be absent in older events)
        String contentType = after.has("content_type")
            ? after.get("content_type").asText()
            : "unknown";

        // V3 field (added 2024-06, may be absent)
        String contentRating = after.has("content_rating")
            ? after.get("content_rating").asText()
            : "unrated";

        // V4 field (renamed from "category" to "doc_category" in 2024-09)
        String docCategory;
        if (after.has("doc_category")) {
            docCategory = after.get("doc_category").asText();
        } else if (after.has("category")) {
            docCategory = after.get("category").asText();
        } else {
            docCategory = "uncategorized";
        }

        return new DocPage(slug, tenantId, title, body,
            contentType, contentRating, docCategory);
    }
}

The Measurement

Schema evolution impact on pipeline availability:

Evolution TypeStrict ConsumerLenient ConsumerDowntime
Add columnCrash (unknown field)Transparent0 vs hours
Remove columnCrash (missing field)Default value0 vs hours
Rename columnCrash (old name gone)Fallback chain0 vs hours
Type changeDeserialization errorRequires updatePlanned vs unplanned

The lenient consumer handles 3 of 4 schema evolution types without code changes. Only type changes (e.g., integer to string) require a consumer update and a planned reindex.

The Decision Rule

Configure all CDC event deserializers with ignoreUnknown = true. A new database column should never crash the indexing pipeline.

Provide default values for all optional fields in the consumer. When a field is absent from a CDC event (because the event was produced before the column was added), the consumer should index a sensible default, not null.

Treat field renames as “add new + keep old” transitions. The consumer checks for the new field name first, falls back to the old field name, and defaults if neither exists. This handles the transition period when events from both schemas coexist in Kafka.