Skip to main content
event sourcing and cqrs in practice

Snapshotting: When Replaying 50,000 Events to Load an Aggregate Becomes Unacceptable

10 min read Chapter 6 of 17

Snapshotting

Snapshotting comparison

Without snapshots, loading an aggregate replays every event from the beginning of the stream. For a stream with 10,000 events, this takes hundreds of milliseconds. With snapshots, the aggregate loads a serialized state checkpoint and replays only the events since that checkpoint. A configurable interval (e.g., every 100 events) controls when snapshots are created. Corrupt snapshots fall back to full replay automatically.

Loading an aggregate means replaying every event in its stream. For a new order with 5 events, this is instantaneous. For a frequently modified order in a high-volume system, the event count can grow to hundreds or thousands. A trading system that records every price update as an event can accumulate 50,000 events per entity in a single day. Loading that aggregate means deserializing 50,000 JSON payloads and applying 50,000 state transitions. At 0.1 milliseconds per event, that is 5 seconds of latency before the command handler can even validate the command.

Snapshotting solves this by periodically saving the aggregate’s current state as a serialized record. When loading the aggregate, the system deserializes the latest snapshot and replays only the events that occurred after the snapshot was taken. Instead of replaying 50,000 events, the system deserializes one snapshot and replays 100 events.

Snapshotting is an optimization, not a requirement. Many event-sourced systems operate without snapshots because their aggregates accumulate events slowly. The order management platform in this book is unlikely to need snapshots for orders (most orders have fewer than 20 events). But inventory aggregates that track every stock adjustment, or customer aggregates that record every interaction, might accumulate enough events to justify snapshotting.

The rule: measure aggregate load times before adding snapshotting. If the 99th percentile load time is acceptable, snapshots add complexity without benefit. If the 99th percentile load time exceeds your latency budget, snapshots are the correct solution.

The Snapshot Store

The Problem

Replaying events is the bottleneck. The event store read is fast (sequential scan with an index), but deserialization and state application scale linearly with event count. The aggregate’s apply method might include business logic computations that compound the cost.

The Mechanism

A snapshot is a serialized representation of the aggregate’s complete state at a specific version. The snapshot table stores this alongside the stream ID and the version number.

-- FROM SCRATCH
CREATE TABLE snapshots (
    stream_id       VARCHAR(255) NOT NULL,
    version         BIGINT NOT NULL,
    aggregate_type  VARCHAR(255) NOT NULL,
    state           JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    PRIMARY KEY (stream_id, version)
);

-- Fast lookup of the latest snapshot for a stream
CREATE INDEX idx_snapshots_latest
    ON snapshots (stream_id, version DESC);

The snapshot is not stored in the event store. Events and snapshots serve different purposes and have different lifecycles. Events are immutable and permanent. Snapshots are disposable: they can be deleted and recreated from the event stream at any time.

The From-Scratch Implementation

// FROM SCRATCH
public class SnapshotStore {

    private final DataSource dataSource;
    private final ObjectMapper mapper;

    public SnapshotStore(DataSource dataSource, ObjectMapper mapper) {
        this.dataSource = dataSource;
        this.mapper = mapper;
    }

    public void save(String streamId, long version, String aggregateType, Object state) {
        String sql = """
            INSERT INTO snapshots (stream_id, version, aggregate_type, state)
            VALUES (?, ?, ?, ?::jsonb)
            ON CONFLICT (stream_id, version) DO UPDATE SET
                state = EXCLUDED.state,
                created_at = NOW()
            """;

        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, streamId);
            stmt.setLong(2, version);
            stmt.setString(3, aggregateType);
            stmt.setString(4, mapper.writeValueAsString(state));
            stmt.executeUpdate();
        } catch (SQLException | JsonProcessingException e) {
            throw new SnapshotException("Failed to save snapshot for " + streamId, e);
        }
    }

    public Optional<Snapshot> loadLatest(String streamId) {
        String sql = """
            SELECT stream_id, version, aggregate_type, state, created_at
            FROM snapshots
            WHERE stream_id = ?
            ORDER BY version DESC
            LIMIT 1
            """;

        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, streamId);
            try (ResultSet rs = stmt.executeQuery()) {
                if (rs.next()) {
                    return Optional.of(new Snapshot(
                        rs.getString("stream_id"),
                        rs.getLong("version"),
                        rs.getString("aggregate_type"),
                        rs.getString("state"),
                        rs.getTimestamp("created_at").toInstant()
                    ));
                }
                return Optional.empty();
            }
        } catch (SQLException e) {
            throw new SnapshotException("Failed to load snapshot for " + streamId, e);
        }
    }
}

public record Snapshot(
    String streamId,
    long version,
    String aggregateType,
    String state,
    Instant createdAt
) {}

The aggregate repository integrates snapshot loading with event replay:

// FROM SCRATCH
public class SnapshottedOrderRepository {

    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;
    private final EventTypeRegistry registry;
    private final ObjectMapper mapper;
    private final int snapshotInterval;

    public SnapshottedOrderRepository(
            EventStore eventStore,
            SnapshotStore snapshotStore,
            EventTypeRegistry registry,
            ObjectMapper mapper,
            int snapshotInterval) {
        this.eventStore = eventStore;
        this.snapshotStore = snapshotStore;
        this.registry = registry;
        this.mapper = mapper;
        this.snapshotInterval = snapshotInterval;
    }

    public OrderAggregate load(String orderId) {
        String streamId = "order-" + orderId;
        OrderAggregate aggregate = new OrderAggregate();

        // Try loading from snapshot
        Optional<Snapshot> snapshot = snapshotStore.loadLatest(streamId);
        long fromSequence = 0;

        if (snapshot.isPresent()) {
            try {
                OrderAggregateState state = mapper.readValue(
                    snapshot.get().state(), OrderAggregateState.class
                );
                aggregate.restoreFromSnapshot(state);
                fromSequence = snapshot.get().version() + 1;
            } catch (JsonProcessingException e) {
                // Snapshot corrupt, fall back to full replay
                aggregate = new OrderAggregate();
                fromSequence = 0;
            }
        }

        // Replay events after snapshot
        List<StoredEvent> events = eventStore.readStream(streamId, fromSequence);
        for (StoredEvent stored : events) {
            OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());
            aggregate.apply(event);
        }

        return aggregate;
    }

    public void save(OrderAggregate aggregate, List<OrderEvent> newEvents) {
        String streamId = "order-" + aggregate.orderId();
        long versionBeforeAppend = aggregate.version() - newEvents.size();
        eventStore.append(streamId, versionBeforeAppend, newEvents);

        // Take snapshot if interval reached
        if (aggregate.version() > 0 && aggregate.version() % snapshotInterval == 0) {
            snapshotStore.save(
                streamId,
                aggregate.version(),
                "OrderAggregate",
                aggregate.toSnapshot()
            );
        }
    }
}

The aggregate needs snapshot support:

// FROM SCRATCH
public class OrderAggregate {

    // ... existing fields and methods ...

    public OrderAggregateState toSnapshot() {
        return new OrderAggregateState(
            orderId, customerId, status, total,
            paidAmount, refundedAmount, lineItems,
            shippingAddress, version
        );
    }

    public void restoreFromSnapshot(OrderAggregateState state) {
        this.orderId = state.orderId();
        this.customerId = state.customerId();
        this.status = state.status();
        this.total = state.total();
        this.paidAmount = state.paidAmount();
        this.refundedAmount = state.refundedAmount();
        this.lineItems = state.lineItems();
        this.shippingAddress = state.shippingAddress();
        this.version = state.version();
    }
}

public record OrderAggregateState(
    String orderId,
    String customerId,
    OrderStatus status,
    BigDecimal total,
    BigDecimal paidAmount,
    BigDecimal refundedAmount,
    List<LineItem> lineItems,
    Address shippingAddress,
    long version
) {}

What the Implementation Reveals

The snapshot is taken after every N events (snapshotInterval). A typical interval is 50 to 100 events. Too frequent (every event) wastes storage and adds write latency. Too infrequent (every 1000 events) does not reduce replay time enough to matter. The interval should be tuned based on measured aggregate load times.

The restoreFromSnapshot method and the toSnapshot method must be kept in sync with the aggregate’s fields. If a new field is added to the aggregate but not to the snapshot, restoring from a snapshot produces an aggregate with the new field uninitialized. This is the snapshot invalidation problem.

When the aggregate’s apply logic changes (a bug fix, a new field derivation), existing snapshots may no longer produce the correct state. The safe approach is to delete all snapshots for the affected aggregate type and let them be recreated from a full event replay. This is one of the reasons snapshots are stored in a separate table: bulk deletion of snapshots does not affect the event store.

The fallback in the load method is important. If a snapshot cannot be deserialized (schema change, corruption), the loader falls back to a full event replay. Snapshots are an optimization. If they fail, the system is slower but not incorrect. The events remain the source of truth.

The Production Path

// PRODUCTION
@Repository
public class SpringSnapshottedOrderRepository {

    private final SpringJdbcEventStore eventStore;
    private final JdbcTemplate jdbc;
    private final ObjectMapper eventMapper;
    private final EventTypeRegistry registry;

    @Value("${event-sourcing.snapshot-interval:100}")
    private int snapshotInterval;

    public SpringSnapshottedOrderRepository(
            SpringJdbcEventStore eventStore,
            JdbcTemplate jdbc,
            @Qualifier("eventObjectMapper") ObjectMapper eventMapper,
            EventTypeRegistry registry) {
        this.eventStore = eventStore;
        this.jdbc = jdbc;
        this.eventMapper = eventMapper;
        this.registry = registry;
    }

    @Transactional(readOnly = true)
    public OrderAggregate load(String orderId) {
        String streamId = "order-" + orderId;
        OrderAggregate aggregate = new OrderAggregate();
        long fromSequence = 0;

        var snapshots = jdbc.query(
            "SELECT version, state FROM snapshots WHERE stream_id = ? ORDER BY version DESC LIMIT 1",
            (rs, rowNum) -> Map.entry(rs.getLong("version"), rs.getString("state")),
            streamId
        );

        if (!snapshots.isEmpty()) {
            try {
                var entry = snapshots.get(0);
                OrderAggregateState state = eventMapper.readValue(
                    entry.getValue(), OrderAggregateState.class
                );
                aggregate.restoreFromSnapshot(state);
                fromSequence = entry.getKey() + 1;
            } catch (JsonProcessingException e) {
                aggregate = new OrderAggregate();
                fromSequence = 0;
            }
        }

        List<StoredEvent> events = eventStore.readStreamFrom(streamId, fromSequence);
        for (StoredEvent stored : events) {
            OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());
            aggregate.apply(event);
        }

        return aggregate;
    }

    @Transactional
    public void save(OrderAggregate aggregate, List<OrderEvent> newEvents) {
        String streamId = "order-" + aggregate.orderId();
        long versionBefore = aggregate.version() - newEvents.size();
        eventStore.append(streamId, versionBefore, newEvents);

        if (aggregate.version() % snapshotInterval == 0) {
            try {
                jdbc.update(
                    """
                    INSERT INTO snapshots (stream_id, version, aggregate_type, state)
                    VALUES (?, ?, ?, ?::jsonb)
                    ON CONFLICT (stream_id, version) DO UPDATE SET state = EXCLUDED.state, created_at = NOW()
                    """,
                    streamId, aggregate.version(), "OrderAggregate",
                    eventMapper.writeValueAsString(aggregate.toSnapshot())
                );
            } catch (JsonProcessingException e) {
                // Snapshot failure is not fatal. Log and continue.
            }
        }
    }
}

The snapshot interval is externalized to a configuration property. This allows tuning without code changes. Different aggregate types might need different intervals.

In Axon Framework, snapshotting is configured declaratively. Axon provides a SnapshotTriggerDefinition that takes a threshold and automatically takes snapshots. The snapshot serialization and deserialization are handled by Axon’s serializer. The developer’s only decision is the threshold value.

The Test

// FROM SCRATCH
@Testcontainers
class SnapshottedLoadingTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
        .withDatabaseName("snapshot_test")
        .withInitScript("event_store_with_snapshots.sql");

    private SnapshottedOrderRepository repository;
    private EventStore eventStore;

    @BeforeEach
    void setUp() {
        var dataSource = new org.postgresql.ds.PGSimpleDataSource();
        dataSource.setUrl(postgres.getJdbcUrl());
        dataSource.setUser(postgres.getUsername());
        dataSource.setPassword(postgres.getPassword());

        var mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

        eventStore = new EventStore(dataSource, mapper);
        var snapshotStore = new SnapshotStore(dataSource, mapper);
        var registry = new EventTypeRegistry(mapper);

        repository = new SnapshottedOrderRepository(
            eventStore, snapshotStore, registry, mapper, 5
        );
    }

    @Test
    void snapshotTakenAfterInterval() {
        // Create order and apply 5 events to trigger snapshot
        OrderAggregate order = new OrderAggregate();
        var placeEvents = order.place(new PlaceOrder(
            "snap-1", "c1",
            List.of(new LineItem("p1", "Widget", 1, BigDecimal.TEN)),
            new Address("1", "C", "S", "0", "US")
        ));
        repository.save(order, placeEvents);

        var confirmEvents = order.confirm(new ConfirmOrder("snap-1"));
        repository.save(order, confirmEvents);

        var payEvents = order.authorizePayment(
            new AuthorizePayment("snap-1", "pay-1", BigDecimal.TEN, "AUTH")
        );
        repository.save(order, payEvents);

        var fulfillEvents = order.fulfill(
            new FulfillOrder("snap-1", "TRACK", "UPS")
        );
        repository.save(order, fulfillEvents);

        var refundEvents = order.requestRefund(
            new RequestRefund("snap-1", "ref-1", new BigDecimal("5.00"), "Damaged")
        );
        repository.save(order, refundEvents); // version = 4, snapshot at 5? depends on interval

        // Load should use snapshot + remaining events
        OrderAggregate loaded = repository.load("snap-1");
        assertEquals(OrderStatus.FULFILLED, loaded.status());
        assertEquals(order.version(), loaded.version());
    }

    @Test
    void loadProducesSameStateWithAndWithoutSnapshot() {
        // Create an order with enough events to trigger a snapshot
        OrderAggregate order = new OrderAggregate();
        List<OrderEvent> events = order.place(new PlaceOrder(
            "snap-2", "c1",
            List.of(new LineItem("p1", "Widget", 3, new BigDecimal("15.00"))),
            new Address("1", "C", "S", "0", "US")
        ));
        repository.save(order, events);

        for (int i = 0; i < 10; i++) {
            events = order.changeAddress(new ChangeShippingAddress(
                "snap-2",
                new Address(String.valueOf(i), "City" + i, "S", "0", "US"),
                "Update " + i
            ));
            repository.save(order, events);
        }

        // Load with snapshots
        OrderAggregate fromSnapshot = repository.load("snap-2");

        // Load without snapshots (full replay)
        var allEvents = eventStore.readStream("order-snap-2", 0);
        OrderAggregate fromReplay = new OrderAggregate();
        var registry = new EventTypeRegistry(new ObjectMapper()
            .registerModule(new JavaTimeModule())
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false));
        for (StoredEvent stored : allEvents) {
            fromReplay.apply(registry.deserialize(stored.eventType(), stored.payload()));
        }

        // Both must produce identical state
        assertEquals(fromReplay.version(), fromSnapshot.version());
        assertEquals(fromReplay.status(), fromSnapshot.status());
        assertEquals(fromReplay.orderId(), fromSnapshot.orderId());
    }
}

The second test is the critical one. It verifies that loading with a snapshot produces the exact same state as a full event replay. If these diverge, the snapshot is corrupt or the snapshot serialization is incomplete. This test should be part of the continuous integration pipeline and run after any change to the aggregate’s state fields.

This chapter established snapshotting as a performance optimization with clear trade-offs. Part III builds the read side: projections, multiple storage targets, and projection rebuilding.