Ingest Pipelines and Document Enrichment
Ingest Pipelines and Document Enrichment
The Symptom
The documentation platform needs to extract the first 200 characters of each document body as a summary field, normalize the content_type field to lowercase, and stamp each document with an indexed_at timestamp. The developer adds three ingest processors to a pipeline. Indexing throughput drops 15% because every document now passes through a Painless script for the summary extraction.
The Internals
Ingest pipelines run on ingest nodes (by default, every data node is also an ingest node). When a document is sent to the bulk API with a pipeline parameter, the coordinating node routes the document to an ingest node, which executes each processor in sequence, then forwards the enriched document to the data node for indexing.
Each processor adds latency to the indexing path. Simple processors (set, rename, lowercase) add microseconds. Script processors (Painless) add milliseconds. External call processors (enrich, HTTP) add tens of milliseconds.
The ingest thread pool is separate from the write thread pool. Ingest pipeline saturation appears as rejections on the ingest thread pool, not the write thread pool.
The Implementation
Documentation Enrichment Pipeline
public void createDocumentEnrichmentPipeline() throws IOException {
Request request = new Request("PUT", "/_ingest/pipeline/doc-enrichment");
request.setJsonEntity("""
{
"description": "Enriches documentation pages at index time",
"processors": [
{
"set": {
"field": "indexed_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"lowercase": {
"field": "content_type"
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.body != null && ctx.body.length() > 200) { ctx.summary = ctx.body.substring(0, 200) + '...'; } else { ctx.summary = ctx.body; }"
}
},
{
"set": {
"field": "word_count",
"value": 0
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.body != null) { ctx.word_count = ctx.body.splitOnToken(' ').length; }"
}
}
]
}
""");
restClient.performRequest(request);
}
// FRAGILE: Two Painless scripts in the pipeline.
// Each script invocation compiles and executes in the Painless sandbox.
// For 2 million documents during bulk import, this adds 4 million
// script executions to the indexing path.
// HARDENED: Move script-based enrichment to the application layer.
// Simple processors (set, lowercase) stay in the pipeline.
public DocPage enrichAtApplicationLayer(DocPage page) {
String summary = page.body() != null && page.body().length() > 200
? page.body().substring(0, 200) + "..."
: page.body();
int wordCount = page.body() != null
? page.body().split("\\s+").length
: 0;
return new DocPage(
page.slug(),
page.tenantId(),
page.title(),
page.body(),
page.contentType().toLowerCase(),
summary,
wordCount,
Instant.now()
);
}
// Reduced pipeline: only processors that need OpenSearch context
public void createMinimalPipeline() throws IOException {
Request request = new Request("PUT", "/_ingest/pipeline/doc-minimal");
request.setJsonEntity("""
{
"description": "Minimal enrichment - only operations needing cluster context",
"processors": [
{
"set": {
"field": "indexed_at",
"value": "{{_ingest.timestamp}}"
}
}
]
}
""");
restClient.performRequest(request);
}
Pipeline Performance Testing
// HARDENED: Simulate pipeline on a single document to measure overhead
public long measurePipelineLatency(String pipelineId, DocPage testDoc)
throws IOException {
long start = System.nanoTime();
Request request = new Request("POST", "/_ingest/pipeline/" +
pipelineId + "/_simulate");
request.setJsonEntity("""
{
"docs": [
{
"_source": %s
}
]
}
""".formatted(objectMapper.writeValueAsString(testDoc)));
restClient.performRequest(request);
return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start);
}
The Measurement
Indexing throughput with different pipeline configurations (bulk import, 500-doc batches):
| Configuration | Throughput | Pipeline Overhead | Where Enrichment Runs |
|---|---|---|---|
| No pipeline | 24,000 docs/s | 0% | — |
| Set + lowercase only | 23,200 docs/s | 3% | Ingest node |
| Set + lowercase + 2 scripts | 20,400 docs/s | 15% | Ingest node |
| Application-layer enrichment + set only | 23,600 docs/s | 2% | App server + ingest node |
Moving Painless scripts from the ingest pipeline to the application layer recovers 13% of indexing throughput. The application server has cheaper CPU cycles than the OpenSearch ingest node, which is also serving search queries.
The Decision Rule
Use ingest pipelines for operations that require OpenSearch context: _ingest.timestamp, enrich processor lookups, and field mutations that must be consistent regardless of the client.
Move all transformable enrichments (string manipulation, field computation, normalization) to the application layer. The application server is horizontally scalable and its CPU does not compete with search queries.
Measure pipeline overhead with _simulate before deploying to production. If the pipeline adds more than 5% overhead to the bulk indexing throughput, evaluate which processors can move to the application layer.