High-Write Workloads: Bulk Indexing and Merge Pressure
High-Write Workloads: Bulk Indexing and Merge Pressure
The documentation platform onboards a new enterprise tenant with 2 million existing documents. The initial import must complete within 4 hours to meet the SLA. Steady-state writes after onboarding average 500 documents per minute. These two modes require fundamentally different indexing configurations.
Bulk API Mechanics
The bulk API accepts multiple operations in a single HTTP request. Each operation is two lines: an action line (index, create, update, delete) and a source line (the document body). OpenSearch processes the entire bulk request on the coordinating node, routes operations to the appropriate shards, and returns per-operation results.
The optimal bulk size balances three factors:
- Network overhead. Small batches waste round-trip time. Each bulk request incurs HTTP overhead, routing computation, and translog fsync.
- Memory pressure. Large batches consume heap memory on the coordinating node for the duration of the request. A 100MB bulk request holds 100MB of parsed JSON in memory.
- Failure blast radius. If a bulk request fails (node crash, timeout), all operations in the batch must be retried.
// HARDENED: Adaptive bulk indexer with back-pressure handling
public class AdaptiveBulkIndexer {
private final OpenSearchClient client;
private static final int INITIAL_BATCH_SIZE = 500;
private static final int MAX_BATCH_SIZE = 2000;
private static final int MIN_BATCH_SIZE = 50;
private int currentBatchSize = INITIAL_BATCH_SIZE;
public void indexDocuments(String index, List<DocPage> documents)
throws Exception {
List<List<DocPage>> batches = partition(documents, currentBatchSize);
for (List<DocPage> batch : batches) {
BulkResponse response = indexBatch(index, batch);
if (response.errors()) {
long rejections = response.items().stream()
.filter(item -> item.error() != null &&
item.error().type().contains("rejected"))
.count();
if (rejections > 0) {
// Back-pressure: reduce batch size and retry rejected items
currentBatchSize = Math.max(MIN_BATCH_SIZE,
currentBatchSize / 2);
List<DocPage> rejected = extractRejected(batch, response);
Thread.sleep(2000); // Brief pause for queue drain
indexBatch(index, rejected);
}
} else {
// No errors: gradually increase batch size
currentBatchSize = Math.min(MAX_BATCH_SIZE,
currentBatchSize + 100);
}
}
}
private BulkResponse indexBatch(String index, List<DocPage> batch)
throws IOException {
BulkRequest.Builder builder = new BulkRequest.Builder()
.index(index)
.refresh(Refresh.False);
for (DocPage doc : batch) {
builder.operations(op -> op
.index(idx -> idx
.id(doc.slug())
.routing(doc.tenantId())
.document(doc)
)
);
}
return client.bulk(builder.build());
}
}
Merge Pressure
Every refresh operation creates a new segment. Segments accumulate until the merge policy selects groups for merging. During high-write workloads, segments are created faster than they can be merged. This is merge pressure.
Symptoms of merge pressure:
indices.merges.currentstays at or nearmax_merge_count(default: 6 per shard)thread_pool.write.rejectedincreases (write queue full because merge I/O saturates disk)- Search latency increases (more segments means more per-segment search overhead)
Initial Load Configuration
// Configuration for initial bulk import (2 million documents)
public void configureForBulkImport(String indexName) throws IOException {
client.indices().putSettings(ps -> ps
.index(indexName)
.settings(s -> s
.refreshInterval(t -> t.time("-1")) // Disable refresh entirely
.numberOfReplicas("0") // No replicas during import
.putAll(Map.of(
"index.translog.durability", JsonData.of("async"),
"index.translog.flush_threshold_size", JsonData.of("1gb")
))
)
);
}
// After import completes: restore production settings
public void configureForProduction(String indexName) throws IOException {
// Force refresh to make all documents searchable
client.indices().refresh(r -> r.index(indexName));
// Force merge to reduce segment count
client.indices().forcemerge(fm -> fm
.index(indexName)
.maxNumSegments(5)
);
// Restore production settings
client.indices().putSettings(ps -> ps
.index(indexName)
.settings(s -> s
.refreshInterval(t -> t.time("5s"))
.numberOfReplicas("1")
.putAll(Map.of(
"index.translog.durability", JsonData.of("request"),
"index.translog.flush_threshold_size", JsonData.of("512mb")
))
)
);
}
Disabling refresh during the initial load avoids creating thousands of tiny segments. The single force merge after the load compacts everything into a manageable number of segments. Disabling replicas halves the write amplification since each write normally goes to both primary and replica.
The diagram shows the write path during a high-volume import. Documents arrive via the bulk API, pass through the translog, accumulate in the indexing buffer, and create segments on refresh. The merge policy selects segments for merging, but when the merge rate falls behind the refresh rate, the merge queue fills and the indexing thread pool begins rejecting writes.
Write Rejection and Thread Pool Monitoring
// HARDENED: Monitor write thread pool for rejection signals
public record WritePoolStatus(
int active,
int queue,
long completed,
long rejected
) {}
public WritePoolStatus getWritePoolStatus(String nodeId) throws IOException {
var stats = client.nodes().stats(ns -> ns
.nodeId(nodeId)
.metric("thread_pool")
);
var node = stats.nodes().values().iterator().next();
var writePool = node.threadPool().get("write");
return new WritePoolStatus(
writePool.active(),
writePool.queue(),
writePool.completed(),
writePool.rejected()
);
}
When the write pool rejected count increases during bulk import, the cluster is signaling that it cannot keep up with the write rate. The correct response is to reduce the client-side write concurrency or batch size, not to increase the write queue size (which defers the problem and risks out-of-memory errors).
The Measurement
Bulk import throughput at different configurations (2 million documents, 50GB):
| Configuration | Throughput | Duration | Merge Rejections |
|---|---|---|---|
| Default settings, batch 500 | 3,200 docs/s | 10.4 hrs | 12,400 |
| Refresh -1, 0 replicas, batch 1000 | 18,000 docs/s | 1.8 hrs | 0 |
| Refresh -1, 0 replicas, batch 2000, async translog | 24,000 docs/s | 1.4 hrs | 0 |
The optimized configuration (refresh disabled, zero replicas, async translog, large batches) achieves 7.5x the throughput of default settings and completes within the 4-hour SLA.
The Decision Rule
Separate initial load configuration from steady-state configuration. Initial loads should disable refresh, set zero replicas, use async translog durability, and run force merge after completion. Steady-state writes need the defaults for data safety and search freshness.
Size bulk batches between 5MB and 15MB of JSON payload. This range balances network efficiency, memory pressure, and failure blast radius. Measure in bytes, not document count, because document sizes vary.
Implement client-side back-pressure. When the bulk response contains rejected_execution_exception errors, reduce concurrency and add a brief pause before retrying. Never increase server-side queue sizes to mask back-pressure signals.