Schema Evolution and Consumer Compatibility
Schema Evolution and Consumer Compatibility
The Symptom
The database team adds a content_rating column to the documents table. Debezium picks up the schema change and publishes events with the new field. The OpenSearch consumer deserializes the event, encounters an unknown field, and throws a JsonMappingException. The consumer stops processing. 50,000 CDC events accumulate in Kafka. The search index falls behind the database by 8 hours before anyone notices.
The Internals
Schema evolution in a CDC pipeline touches three independent schemas:
- Database schema. The PostgreSQL table definition. Changes via
ALTER TABLE. - CDC event schema. The Kafka message schema (Avro, JSON Schema, or JSON). Changes when the database schema changes.
- OpenSearch mapping. The index mapping. Must accommodate new fields from the CDC events.
Each schema evolves independently. A column added to PostgreSQL propagates through the CDC event schema and must be handled by the consumer before it can be mapped in OpenSearch. If any layer rejects the change, the pipeline breaks.
The Implementation
Resilient Event Deserialization
// HARDENED: Lenient deserialization that ignores unknown fields
// New fields in the CDC event do not crash the consumer.
@JsonIgnoreProperties(ignoreUnknown = true)
public record DocumentEvent(
String slug,
String tenantId,
String title,
String body,
String contentType,
String version,
Instant updatedAt,
// New fields are added here with @JsonProperty and default values
@JsonProperty("content_rating")
@JsonSetter(nulls = Nulls.SKIP)
String contentRating
) {
public DocumentEvent {
if (contentRating == null) {
contentRating = "unrated";
}
}
}
// FRAGILE: Strict deserialization that fails on unknown fields.
// Any new column in the database crashes the consumer.
// ObjectMapper with DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES = true
Mapping Evolution Strategy
// HARDENED: Check for new fields and update the mapping before indexing
public class SchemaEvolutionHandler {
private final OpenSearchClient client;
private final Set<String> knownFields = ConcurrentHashMap.newKeySet();
public SchemaEvolutionHandler(OpenSearchClient client) {
this.client = client;
}
public void ensureFieldMapped(String indexName, String fieldName,
String fieldType) throws IOException {
if (knownFields.contains(fieldName)) {
return;
}
// Check if field already exists in the mapping
var mapping = client.indices().getMapping(gm -> gm.index(indexName));
var properties = mapping.get(indexName).mappings().properties();
if (!properties.containsKey(fieldName)) {
// Add the new field to the mapping
client.indices().putMapping(pm -> pm
.index(indexName)
.properties(fieldName, p -> {
return switch (fieldType) {
case "keyword" -> p.keyword(k -> k);
case "text" -> p.text(t -> t);
case "integer" -> p.integer(i -> i);
case "date" -> p.date(d -> d);
default -> p.keyword(k -> k);
};
})
);
}
knownFields.add(fieldName);
}
}
Consumer Version Compatibility Matrix
// Version compatibility: consumer handles events from multiple
// database schema versions simultaneously during rolling deployments
public class DocumentTransformer {
public DocPage transform(JsonNode after) {
// V1 fields (always present)
String slug = after.get("slug").asText();
String tenantId = after.get("tenant_id").asText();
String title = after.get("title").asText();
String body = after.get("body").asText();
// V2 field (added 2024-03, may be absent in older events)
String contentType = after.has("content_type")
? after.get("content_type").asText()
: "unknown";
// V3 field (added 2024-06, may be absent)
String contentRating = after.has("content_rating")
? after.get("content_rating").asText()
: "unrated";
// V4 field (renamed from "category" to "doc_category" in 2024-09)
String docCategory;
if (after.has("doc_category")) {
docCategory = after.get("doc_category").asText();
} else if (after.has("category")) {
docCategory = after.get("category").asText();
} else {
docCategory = "uncategorized";
}
return new DocPage(slug, tenantId, title, body,
contentType, contentRating, docCategory);
}
}
The Measurement
Schema evolution impact on pipeline availability:
| Evolution Type | Strict Consumer | Lenient Consumer | Downtime |
|---|---|---|---|
| Add column | Crash (unknown field) | Transparent | 0 vs hours |
| Remove column | Crash (missing field) | Default value | 0 vs hours |
| Rename column | Crash (old name gone) | Fallback chain | 0 vs hours |
| Type change | Deserialization error | Requires update | Planned vs unplanned |
The lenient consumer handles 3 of 4 schema evolution types without code changes. Only type changes (e.g., integer to string) require a consumer update and a planned reindex.
The Decision Rule
Configure all CDC event deserializers with ignoreUnknown = true. A new database column should never crash the indexing pipeline.
Provide default values for all optional fields in the consumer. When a field is absent from a CDC event (because the event was produced before the column was added), the consumer should index a sensible default, not null.
Treat field renames as “add new + keep old” transitions. The consumer checks for the new field name first, falls back to the old field name, and defaults if neither exists. This handles the transition period when events from both schemas coexist in Kafka.