Shard Recovery Mechanics and Peer Recovery Optimization
Shard Recovery Mechanics and Peer Recovery Optimization
The Symptom
A data node restarts after a rolling upgrade. The cluster goes yellow as it recovers 28 shards back onto the restarted node. Recovery takes 45 minutes. During recovery, search latency on the recovering shards is 5x higher than normal because the shards are replaying the translog while also serving search requests.
The Internals
Shard recovery happens in five phases:
-
INIT. The recovery target (the node receiving the shard) sends a recovery request to the source (the node holding the primary shard or the snapshot repository).
-
INDEX. The source sends Lucene segment files to the target. For peer recovery, this is a file-based copy. Segments that already exist on the target (from a previous allocation) are skipped.
-
VERIFY_INDEX. The target verifies the integrity of received segment files using checksums.
-
TRANSLOG. The source sends translog operations that occurred during the INDEX phase. The target replays these operations to catch up to the current state.
-
FINALIZE. The target opens the shard for search. The recovery is complete.
The total recovery time is dominated by the INDEX phase (copying segments) and the TRANSLOG phase (replaying operations). If the node was down briefly, most segments already exist on the target and the INDEX phase is fast. If the node was replaced, all segments must be copied.
The Implementation
Recovery Monitoring
public record ShardRecovery(
String index,
int shardId,
String stage,
String sourceNode,
String targetNode,
long bytesRecovered,
long bytesTotal,
double percentComplete,
long translogOpsRecovered,
long translogOpsTotal,
long elapsedMs
) {}
public List<ShardRecovery> getActiveRecoveries() throws IOException {
var recoveries = client.indices().recovery(r -> r
.activeOnly(true));
List<ShardRecovery> results = new ArrayList<>();
for (var indexEntry : recoveries.result().entrySet()) {
for (var shard : indexEntry.getValue().shards()) {
var index = shard.index();
var translog = shard.translog();
double percent = index.size().totalInBytes() > 0
? (double) index.size().recoveredInBytes() /
index.size().totalInBytes() * 100
: 0;
results.add(new ShardRecovery(
indexEntry.getKey(),
shard.id(),
shard.stage(),
shard.source().name(),
shard.target().name(),
index.size().recoveredInBytes(),
index.size().totalInBytes(),
percent,
translog.recovered(),
translog.total(),
shard.totalTimeInMillis()
));
}
}
return results;
}
Recovery Throttling Configuration
// Increase recovery speed at the cost of more network/disk I/O
// on source and target nodes
public void optimizeRecoverySettings() throws IOException {
client.cluster().putSettings(ps -> ps
.persistent(s -> s
.putAll(Map.of(
// Max bytes per second for file transfer
"indices.recovery.max_bytes_per_sec",
JsonData.of("200mb"),
// Max concurrent file transfers per node
"cluster.routing.allocation.node_concurrent_recoveries",
JsonData.of(4),
// Max concurrent incoming recoveries per node
"cluster.routing.allocation.node_concurrent_incoming_recoveries",
JsonData.of(4),
// Max concurrent outgoing recoveries per node
"cluster.routing.allocation.node_concurrent_outgoing_recoveries",
JsonData.of(4)
))
)
);
}
// HARDENED: Delay shard allocation after node departure to avoid
// unnecessary data movement for transient node failures
public void configureAllocationDelay() throws IOException {
client.indices().putSettings(ps -> ps
.index("docs-*")
.settings(s -> s
.putAll(Map.of(
"index.unassigned.node_left.delayed_timeout",
JsonData.of("5m")
))
)
);
}
The delayed_timeout setting prevents the cluster from immediately reallocating shards when a node leaves. If the node returns within 5 minutes (a typical restart duration during rolling upgrades), the shards are recovered in place with only the translog replay—no segment file copying needed. This reduces recovery time from 45 minutes (full segment copy) to 2 minutes (translog replay only).
The Measurement
Recovery time for a 50GB shard under different scenarios:
| Scenario | INDEX Phase | TRANSLOG Phase | Total | Network Transfer |
|---|---|---|---|---|
| Node restart (< 5min, delayed allocation) | Skipped | 30s | 45s | Minimal |
| Node restart (> 5min, shards reallocated) | 12 min | 1 min | 14 min | 50GB |
| New node (empty, all segments copied) | 22 min | 2 min | 25 min | 50GB |
| Recovery throttle at 40mb/s (default) | 22 min | 2 min | 25 min | 50GB |
| Recovery throttle at 200mb/s | 5 min | 2 min | 8 min | 50GB |
The Decision Rule
Set delayed_timeout to match the maximum expected node restart duration. For rolling upgrades, 5 minutes is typical. This single setting reduces recovery time by 10-20x for planned restarts by avoiding unnecessary segment file copies.
Increase max_bytes_per_sec to 200mb during maintenance windows when recovery speed matters more than search performance. Reset to the default (40mb) after recovery completes to avoid impacting search I/O.
Monitor the TRANSLOG phase duration. A long translog replay indicates the primary shard received many writes during recovery. For write-heavy indices, consider temporarily pausing writes during recovery to shorten the translog phase.