Skip to main content
unbound mongodb at scale

Aggregation Pipelines at Scale

2 min read Chapter 34 of 72

Aggregation Pipelines at Scale

MongoDB’s aggregation framework processes documents through a sequence of stages. Each stage transforms the document stream: filtering, grouping, reshaping, joining. The order of stages determines how much data flows through the pipeline and how much memory is consumed at each step.

The optimizer reorders some stages automatically. It pushes $match stages before $project and $addFields when possible. But it does not reorder across $unwind, $lookup, or $group boundaries. The developer is responsible for placing expensive stages after stages that reduce the document count.

Aggregation pipeline stage reordering: left side shows unoptimized pipeline ($project, $unwind, $match, $group) processing 500K documents through all stages. Right side shows optimized pipeline ($match first, then $project, $unwind, $group) reducing documents to 12K at the first stage. Memory usage comparison at each stage.

The Execution Model

An aggregation pipeline is not a set of queries. It is a single execution plan with streaming and blocking stages:

Streaming stages process documents one at a time and pass them to the next stage immediately: $match, $project, $addFields, $unset, $replaceRoot, $limit.

Blocking stages must consume all input before producing output: $group, $sort, $bucket, $facet. These stages accumulate results in memory (or on disk if allowDiskUse is enabled).

The memory limit for blocking stages is 100 MB. If a $sort or $group exceeds 100 MB without allowDiskUse, the aggregation fails with error code 16819. With allowDiskUse, the stage spills to disk, which is slow.

// Aggregation with explain
AggregateIterable<Document> pipeline = collection.aggregate(List.of(
    Aggregates.match(Filters.gte("ts", Date.from(dayStart))),
    Aggregates.group("$sensorId",
        Accumulators.avg("avgTemp", "$temperature"),
        Accumulators.min("minTemp", "$temperature"),
        Accumulators.max("maxTemp", "$temperature"),
        Accumulators.sum("count", 1)
    ),
    Aggregates.sort(Sorts.descending("avgTemp")),
    Aggregates.limit(100)
));

// Get explain output
Document explainResult = collection.aggregate(pipeline.getPipeline())
    .explain(ExplainVerbosity.EXECUTION_STATS);

The explain output shows each stage’s execution time, documents processed, and memory usage.