Nested Aggregations and Composite Pagination
Nested Aggregations and Composite Pagination
The Symptom
The documentation platform adds a “Browse by Version” page that shows, for each documentation version, the count of documents per content type. The developer writes a terms aggregation on version with a sub-aggregation on content_type. The response time grows linearly with the number of versions. At 200 versions across all tenants, the aggregation takes 3 seconds. The coordinating node holds all 200 x 5 = 1,000 buckets in memory during reduction.
The Internals
Sub-aggregations (or “nested aggregations” in the API, not to be confused with nested field type aggregations) run inside each bucket of the parent aggregation. A terms aggregation on version with a sub-aggregation on content_type first computes the top N version buckets, then within each version bucket, computes the content type distribution.
The shard-level cost is proportional to: unique_versions x unique_content_types x docs_per_bucket. Each combination requires scanning doc values. The coordinating node must merge (unique_versions x unique_content_types) buckets from every shard.
Composite aggregations solve the memory problem by paginating through the Cartesian product of multiple fields. Instead of materializing all combinations in memory, the composite aggregation returns a page of results sorted by the composite key, with an after_key for the next page.
The Implementation
Multi-Level Faceted Navigation
// HARDENED: Version → Content Type faceted navigation
// Uses composite aggregation for bounded memory usage
public record VersionFacet(
String version,
Map<String, Long> contentTypeCounts
) {}
public List<VersionFacet> browseByVersion(String tenantId) throws IOException {
List<VersionFacet> allFacets = new ArrayList<>();
Map<String, JsonData> afterKey = null;
do {
var compositeBuilder = CompositeAggregation.of(c -> {
c.size(50);
c.sources(Map.of(
"version", CompositeAggregationSource.of(cs -> cs
.terms(t -> t.field("version").order(SortOrder.Desc))
),
"content_type", CompositeAggregationSource.of(cs -> cs
.terms(t -> t.field("content_type"))
)
));
if (afterKey != null) {
c.after(afterKey);
}
return c;
});
SearchResponse<Void> response = client.search(s -> s
.index("docs-v1")
.routing(tenantId)
.size(0)
.query(q -> q.term(t -> t.field("tenant_id").value(tenantId)))
.aggregations("version_content", a -> a.composite(compositeBuilder)),
Void.class
);
var composite = response.aggregations().get("version_content").composite();
// Group flat composite buckets into hierarchical VersionFacets
Map<String, Map<String, Long>> grouped = new LinkedHashMap<>();
for (var bucket : composite.buckets().array()) {
String version = bucket.key().get("version").to(String.class);
String contentType = bucket.key().get("content_type").to(String.class);
grouped.computeIfAbsent(version, k -> new LinkedHashMap<>())
.put(contentType, bucket.docCount());
}
for (var entry : grouped.entrySet()) {
allFacets.add(new VersionFacet(entry.getKey(), entry.getValue()));
}
afterKey = composite.afterKey();
} while (afterKey != null && !afterKey.isEmpty());
return allFacets;
}
Date Histogram for Changelog Timeline
// Monthly changelog activity with document count per content type
SearchRequest changelogTimeline = SearchRequest.of(s -> s
.index("docs-v1")
.routing(tenantId)
.size(0)
.query(q -> q
.bool(b -> b
.filter(f -> f.term(t -> t.field("tenant_id").value(tenantId)))
.filter(f -> f.term(t -> t.field("content_type").value("changelog")))
)
)
.aggregations("monthly", a -> a
.dateHistogram(dh -> dh
.field("published_date")
.calendarInterval(CalendarInterval.Month)
.minDocCount(1)
.format("yyyy-MM")
)
.aggregations("top_entries", sub -> sub
.topHits(th -> th
.size(3)
.source(src -> src
.filter(f -> f.includes("title", "slug", "published_date"))
)
.sort(so -> so.field(fs -> fs
.field("published_date")
.order(SortOrder.Desc)
))
)
)
)
);
Aggregation Performance Optimization
// FRAGILE: Aggregation on a text field
// Text fields do not have doc values. OpenSearch falls back to fielddata,
// loading all unique terms into heap memory. This can crash the node.
builder.aggregations("body_terms", a -> a
.terms(t -> t.field("body")) // text field → fielddata → heap explosion
);
// HARDENED: Aggregate only on keyword, numeric, or date fields
// These types use doc values (column-oriented disk storage), not heap.
builder.aggregations("content_type_filter", a -> a
.terms(t -> t
.field("content_type") // keyword field → doc values → bounded memory
.size(20)
)
);
The Measurement
Aggregation performance by strategy:
| Approach | Memory (per shard) | Latency (200 versions) | Scalability |
|---|---|---|---|
| Terms + sub-agg (all in memory) | ~15MB | 3,200ms | Degrades linearly |
| Composite (page size 50) | ~2MB | 850ms (total, 4 pages) | Constant per page |
| Terms + shard_size hint | ~8MB | 1,800ms | Degrades linearly |
The composite aggregation uses 7x less memory and completes 3.8x faster for this workload. The advantage grows with cardinality: at 1,000 versions, the terms aggregation degrades to 15 seconds while the composite approach remains under 2 seconds per page.
The Decision Rule
Use composite aggregations when the parent aggregation has more than 100 unique values or when the Cartesian product of multi-level aggregations exceeds 500 combinations. The pagination overhead is negligible compared to the memory savings.
Never aggregate on text fields. OpenSearch will load fielddata into heap memory to service the request. If you need to aggregate on a text-like value, add a .keyword sub-field in the mapping and aggregate on that.
Pre-compute aggregation results for dashboards with stable data. Store the aggregation output in a separate index or cache. Re-aggregating the same data on every dashboard load wastes cluster resources when the underlying data changes infrequently.