Skip to main content
unbound mongodb at scale

Stage Ordering and the $match-First Principle

4 min read Chapter 35 of 72

Stage Ordering and the $match-First Principle

The Symptom

The daily analytics report computes average readings per sensor for the last 24 hours, enriched with sensor metadata via $lookup. The pipeline takes 12 seconds and uses 2.8 GB of temporary disk space (allowDiskUse is enabled). The collection has 200 million documents, but only 2.5 million are from the last 24 hours.

The Cause

The pipeline processes stages in the order they were written:

// SLOW: $lookup before $match on time range
db.readings.aggregate([
  { $lookup: {
      from: "sensors",
      localField: "sensorId",
      foreignField: "_id",
      as: "sensor"
  }},
  { $unwind: "$sensor" },
  { $match: { ts: { $gte: yesterday } } },       // Filters AFTER $lookup
  { $group: {
      _id: { sensorId: "$sensorId", building: "$sensor.building" },
      avgTemp: { $avg: "$temperature" },
      count: { $sum: 1 }
  }},
  { $sort: { avgTemp: -1 } }
], { allowDiskUse: true })

The $lookup stage runs against all 200 million documents before the $match filters to the last 24 hours. Each of those 200 million documents triggers a lookup against the sensors collection. The optimizer cannot reorder $match before $lookup automatically because the $unwind stage between them changes the document structure.

The Benchmark

@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class PipelineOrderingBenchmark {

    private MongoCollection<Document> collection;
    private Date yesterday;

    @Setup
    public void setup() {
        MongoClient client = MongoClients.create("mongodb://localhost:27017");
        collection = client.getDatabase("telemetry").getCollection("readings");
        yesterday = Date.from(Instant.now().minus(1, ChronoUnit.DAYS));
    }

    @Benchmark
    public List<Document> lookupFirst() {
        return collection.aggregate(List.of(
            Aggregates.lookup("sensors", "sensorId", "_id", "sensor"),
            Aggregates.unwind("$sensor"),
            Aggregates.match(Filters.gte("ts", yesterday)),
            Aggregates.group(
                new Document("sensorId", "$sensorId").append("building", "$sensor.building"),
                Accumulators.avg("avgTemp", "$temperature"),
                Accumulators.sum("count", 1)
            ),
            Aggregates.sort(Sorts.descending("avgTemp"))
        )).allowDiskUse(true).into(new ArrayList<>());
    }

    @Benchmark
    public List<Document> matchFirst() {
        return collection.aggregate(List.of(
            Aggregates.match(Filters.gte("ts", yesterday)),
            Aggregates.lookup("sensors", "sensorId", "_id", "sensor"),
            Aggregates.unwind("$sensor"),
            Aggregates.group(
                new Document("sensorId", "$sensorId").append("building", "$sensor.building"),
                Accumulators.avg("avgTemp", "$temperature"),
                Accumulators.sum("count", 1)
            ),
            Aggregates.sort(Sorts.descending("avgTemp"))
        )).into(new ArrayList<>());
    }

    @Benchmark
    public List<Document> matchFirstWithProjection() {
        return collection.aggregate(List.of(
            Aggregates.match(Filters.gte("ts", yesterday)),
            Aggregates.project(Projections.include("sensorId", "temperature", "ts")),
            Aggregates.lookup("sensors", "sensorId", "_id", "sensor"),
            Aggregates.unwind("$sensor"),
            Aggregates.group(
                new Document("sensorId", "$sensorId").append("building", "$sensor.building"),
                Accumulators.avg("avgTemp", "$temperature"),
                Accumulators.sum("count", 1)
            ),
            Aggregates.sort(Sorts.descending("avgTemp"))
        )).into(new ArrayList<>());
    }
}

Results:

Benchmark                                         Mode  Cnt    Score  Error  Units
PipelineOrderingBenchmark.lookupFirst             ss       1  12.000         s/op
PipelineOrderingBenchmark.matchFirst              ss       1   1.800         s/op
PipelineOrderingBenchmark.matchFirstWithProjection ss      1   1.200         s/op

Moving $match before $lookup reduces execution from 12 seconds to 1.8 seconds (6.7x improvement). Adding $project before $lookup to strip unnecessary fields reduces it further to 1.2 seconds. The $project reduces the document size flowing through the pipeline, which reduces memory usage in subsequent stages.

The Fix

The pipeline ordering checklist:

  1. Place $match stages as early as possible. Every document filtered out avoids all subsequent stages.
  2. Place $project/$unset before $lookup, $group, and $sort to reduce document size.
  3. Place $limit immediately after $sort to enable the top-k optimization (MongoDB keeps only the top k documents in memory during sort).
  4. Combine multiple $match stages into one when they are adjacent (the optimizer does this, but explicit is clearer).
// FAST: Optimized pipeline ordering
List<Document> results = collection.aggregate(List.of(
    // 1. Filter first: 200M -> 2.5M documents
    Aggregates.match(Filters.gte("ts", Date.from(yesterday))),

    // 2. Project only needed fields: reduces per-doc size from 340 to 80 bytes
    Aggregates.project(Projections.fields(
        Projections.include("sensorId", "temperature"),
        Projections.excludeId()
    )),

    // 3. Group: 2.5M -> 10K documents
    Aggregates.group("$sensorId",
        Accumulators.avg("avgTemp", "$temperature"),
        Accumulators.sum("count", 1)
    ),

    // 4. Lookup on grouped results: 10K lookups, not 2.5M
    Aggregates.lookup("sensors", "_id", "_id", "sensor"),
    Aggregates.unwind("$sensor"),

    // 5. Project final shape
    Aggregates.project(Projections.fields(
        Projections.computed("sensorId", "$_id"),
        Projections.include("avgTemp", "count"),
        Projections.computed("building", "$sensor.building")
    )),

    // 6. Sort and limit
    Aggregates.sort(Sorts.descending("avgTemp")),
    Aggregates.limit(100)
)).into(new ArrayList<>());

The $lookup now runs on 10,000 grouped results instead of 2.5 million individual readings.

The Proof

MetricUnoptimizedOptimized
Execution time12s0.4s
Documents processed by $lookup200,000,00010,000
Peak memory usage2.8 GB (disk spill)45 MB
allowDiskUse requiredYesNo

The Trade-off

Moving $group before $lookup changes the semantics when the lookup field is used in the group key. In the original pipeline, grouping is by {sensorId, building}. In the optimized pipeline, grouping is only by sensorId, and building is added after grouping. This works because sensorId has a one-to-one relationship with building. If the relationship were many-to-many (a sensor could be in multiple buildings), the optimized pipeline would produce different results. Verify that the semantic equivalence holds before reordering across $lookup boundaries.