Skip to main content
search at depth

High-Write Workloads: Bulk Indexing and Merge Pressure

5 min read Chapter 37 of 60

High-Write Workloads: Bulk Indexing and Merge Pressure

The documentation platform onboards a new enterprise tenant with 2 million existing documents. The initial import must complete within 4 hours to meet the SLA. Steady-state writes after onboarding average 500 documents per minute. These two modes require fundamentally different indexing configurations.

Bulk API Mechanics

The bulk API accepts multiple operations in a single HTTP request. Each operation is two lines: an action line (index, create, update, delete) and a source line (the document body). OpenSearch processes the entire bulk request on the coordinating node, routes operations to the appropriate shards, and returns per-operation results.

The optimal bulk size balances three factors:

  • Network overhead. Small batches waste round-trip time. Each bulk request incurs HTTP overhead, routing computation, and translog fsync.
  • Memory pressure. Large batches consume heap memory on the coordinating node for the duration of the request. A 100MB bulk request holds 100MB of parsed JSON in memory.
  • Failure blast radius. If a bulk request fails (node crash, timeout), all operations in the batch must be retried.
// HARDENED: Adaptive bulk indexer with back-pressure handling

public class AdaptiveBulkIndexer {

    private final OpenSearchClient client;
    private static final int INITIAL_BATCH_SIZE = 500;
    private static final int MAX_BATCH_SIZE = 2000;
    private static final int MIN_BATCH_SIZE = 50;

    private int currentBatchSize = INITIAL_BATCH_SIZE;

    public void indexDocuments(String index, List<DocPage> documents)
            throws Exception {

        List<List<DocPage>> batches = partition(documents, currentBatchSize);

        for (List<DocPage> batch : batches) {
            BulkResponse response = indexBatch(index, batch);

            if (response.errors()) {
                long rejections = response.items().stream()
                    .filter(item -> item.error() != null &&
                        item.error().type().contains("rejected"))
                    .count();

                if (rejections > 0) {
                    // Back-pressure: reduce batch size and retry rejected items
                    currentBatchSize = Math.max(MIN_BATCH_SIZE,
                        currentBatchSize / 2);
                    List<DocPage> rejected = extractRejected(batch, response);
                    Thread.sleep(2000);  // Brief pause for queue drain
                    indexBatch(index, rejected);
                }
            } else {
                // No errors: gradually increase batch size
                currentBatchSize = Math.min(MAX_BATCH_SIZE,
                    currentBatchSize + 100);
            }
        }
    }

    private BulkResponse indexBatch(String index, List<DocPage> batch)
            throws IOException {
        BulkRequest.Builder builder = new BulkRequest.Builder()
            .index(index)
            .refresh(Refresh.False);

        for (DocPage doc : batch) {
            builder.operations(op -> op
                .index(idx -> idx
                    .id(doc.slug())
                    .routing(doc.tenantId())
                    .document(doc)
                )
            );
        }

        return client.bulk(builder.build());
    }
}

Merge Pressure

Every refresh operation creates a new segment. Segments accumulate until the merge policy selects groups for merging. During high-write workloads, segments are created faster than they can be merged. This is merge pressure.

Symptoms of merge pressure:

  • indices.merges.current stays at or near max_merge_count (default: 6 per shard)
  • thread_pool.write.rejected increases (write queue full because merge I/O saturates disk)
  • Search latency increases (more segments means more per-segment search overhead)

Initial Load Configuration

// Configuration for initial bulk import (2 million documents)

public void configureForBulkImport(String indexName) throws IOException {
    client.indices().putSettings(ps -> ps
        .index(indexName)
        .settings(s -> s
            .refreshInterval(t -> t.time("-1"))  // Disable refresh entirely
            .numberOfReplicas("0")                // No replicas during import
            .putAll(Map.of(
                "index.translog.durability", JsonData.of("async"),
                "index.translog.flush_threshold_size", JsonData.of("1gb")
            ))
        )
    );
}

// After import completes: restore production settings
public void configureForProduction(String indexName) throws IOException {
    // Force refresh to make all documents searchable
    client.indices().refresh(r -> r.index(indexName));

    // Force merge to reduce segment count
    client.indices().forcemerge(fm -> fm
        .index(indexName)
        .maxNumSegments(5)
    );

    // Restore production settings
    client.indices().putSettings(ps -> ps
        .index(indexName)
        .settings(s -> s
            .refreshInterval(t -> t.time("5s"))
            .numberOfReplicas("1")
            .putAll(Map.of(
                "index.translog.durability", JsonData.of("request"),
                "index.translog.flush_threshold_size", JsonData.of("512mb")
            ))
        )
    );
}

Disabling refresh during the initial load avoids creating thousands of tiny segments. The single force merge after the load compacts everything into a manageable number of segments. Disabling replicas halves the write amplification since each write normally goes to both primary and replica.

Write path under bulk load showing segment creation, merge queue saturation, and write rejection

The diagram shows the write path during a high-volume import. Documents arrive via the bulk API, pass through the translog, accumulate in the indexing buffer, and create segments on refresh. The merge policy selects segments for merging, but when the merge rate falls behind the refresh rate, the merge queue fills and the indexing thread pool begins rejecting writes.

Write Rejection and Thread Pool Monitoring

// HARDENED: Monitor write thread pool for rejection signals

public record WritePoolStatus(
    int active,
    int queue,
    long completed,
    long rejected
) {}

public WritePoolStatus getWritePoolStatus(String nodeId) throws IOException {
    var stats = client.nodes().stats(ns -> ns
        .nodeId(nodeId)
        .metric("thread_pool")
    );

    var node = stats.nodes().values().iterator().next();
    var writePool = node.threadPool().get("write");

    return new WritePoolStatus(
        writePool.active(),
        writePool.queue(),
        writePool.completed(),
        writePool.rejected()
    );
}

When the write pool rejected count increases during bulk import, the cluster is signaling that it cannot keep up with the write rate. The correct response is to reduce the client-side write concurrency or batch size, not to increase the write queue size (which defers the problem and risks out-of-memory errors).

The Measurement

Bulk import throughput at different configurations (2 million documents, 50GB):

ConfigurationThroughputDurationMerge Rejections
Default settings, batch 5003,200 docs/s10.4 hrs12,400
Refresh -1, 0 replicas, batch 100018,000 docs/s1.8 hrs0
Refresh -1, 0 replicas, batch 2000, async translog24,000 docs/s1.4 hrs0

The optimized configuration (refresh disabled, zero replicas, async translog, large batches) achieves 7.5x the throughput of default settings and completes within the 4-hour SLA.

The Decision Rule

Separate initial load configuration from steady-state configuration. Initial loads should disable refresh, set zero replicas, use async translog durability, and run force merge after completion. Steady-state writes need the defaults for data safety and search freshness.

Size bulk batches between 5MB and 15MB of JSON payload. This range balances network efficiency, memory pressure, and failure blast radius. Measure in bytes, not document count, because document sizes vary.

Implement client-side back-pressure. When the bulk response contains rejected_execution_exception errors, reduce concurrency and add a brief pause before retrying. Never increase server-side queue sizes to mask back-pressure signals.