Skip to main content
event sourcing and cqrs in practice

Read Model Storage: PostgreSQL, Redis, and Elasticsearch as Projection Targets

12 min read Chapter 8 of 17

Read Model Storage

Read model trade-offs

Different read model engines serve different query patterns. PostgreSQL provides transactional consistency with the event store but has higher read latency at scale. Redis delivers sub-millisecond lookups but offers no transactions. Elasticsearch excels at full-text search and faceted queries. The choice depends on the query pattern, not a universal preference.

A single event stream can feed multiple read models. The order events that update the order_summary PostgreSQL table also update a Redis cache for the customer-facing API and an Elasticsearch index for the admin search panel. Each read model is optimized for its query pattern. Each has different consistency characteristics.

This is a core strength of event sourcing combined with CQRS. In a CRUD system, adding a new query target means adding a synchronization mechanism: a trigger, a CDC pipeline, or an ETL job. In an event-sourced system, the events already exist. A new read model is a new consumer of the existing event stream. No new synchronization mechanism is needed. The events are the synchronization mechanism.

PostgreSQL Read Model

The PostgreSQL read model from chapter 7 provides ACID guarantees. The projection update and the position tracking share a transaction. The read model is always consistent with the tracked position. If the projection crashes, it resumes from the last committed position without data loss or duplication.

This is the baseline. When the query pattern fits relational SQL (filters, joins, pagination, aggregations), PostgreSQL is the correct target. Its consistency guarantees simplify operations. Its indexing capabilities cover most query patterns. Its monitoring and backup infrastructure already exists.

The PostgreSQL read model’s limitation is latency. A query that joins three tables and sorts by a computed column requires milliseconds of database work. For a customer-facing API endpoint where the response budget is 10 milliseconds, the database query might consume the entire budget.

Redis Read Model

The Problem

The customer-facing API serves order details at high frequency. Every page load, every polling refresh, every mobile app sync requests the same order data. The PostgreSQL read model can handle this load with connection pooling and read replicas, but each query involves a network round-trip to the database, query parsing, and result serialization. For hot data (recent orders that customers check repeatedly), a cache eliminates the repeated database work.

The Mechanism

The Redis projection writes a serialized order summary to a Redis hash keyed by order ID. The customer-facing API reads from Redis first. If the key exists, the response is served from cache. If the key does not exist (cache miss, TTL expiry), the API falls back to the PostgreSQL read model.

The From-Scratch Implementation

// FROM SCRATCH
public class RedisOrderProjection implements EventProjection {

    private final JedisPool jedisPool;
    private final EventTypeRegistry registry;
    private final ObjectMapper mapper;
    private final int ttlSeconds;

    public RedisOrderProjection(JedisPool jedisPool, EventTypeRegistry registry,
                                ObjectMapper mapper, int ttlSeconds) {
        this.jedisPool = jedisPool;
        this.registry = registry;
        this.mapper = mapper;
        this.ttlSeconds = ttlSeconds;
    }

    @Override
    public void process(StoredEvent stored, Connection conn) throws Exception {
        OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());

        switch (event) {
            case OrderPlaced e -> writeOrderToRedis(e);
            case OrderConfirmed e -> updateOrderFieldInRedis(e.orderId(), "status", "CONFIRMED");
            case PaymentAuthorized e -> updateOrderFieldInRedis(e.orderId(), "status", "PAYMENT_AUTHORIZED");
            case OrderFulfilled e -> updateOrderFieldInRedis(e.orderId(), "status", "FULFILLED");
            case OrderCancelled e -> updateOrderFieldInRedis(e.orderId(), "status", "CANCELLED");
            case ShippingAddressChanged e -> updateOrderFieldInRedis(e.orderId(), "shippingCity", e.newAddress().city());
            case RefundRequested e -> incrementRefundInRedis(e);
        }
    }

    private void writeOrderToRedis(OrderPlaced event) {
        try (Jedis jedis = jedisPool.getResource()) {
            String key = "order:" + event.orderId();
            Map<String, String> fields = Map.of(
                "orderId", event.orderId(),
                "customerId", event.customerId(),
                "status", "PLACED",
                "total", event.total().toPlainString(),
                "itemCount", String.valueOf(event.lineItems().size()),
                "shippingCity", event.shippingAddress().city()
            );
            jedis.hset(key, fields);
            jedis.expire(key, ttlSeconds);
        }
    }

    private void updateOrderFieldInRedis(String orderId, String field, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            String key = "order:" + orderId;
            if (jedis.exists(key)) {
                jedis.hset(key, field, value);
                jedis.expire(key, ttlSeconds);
            }
            // If key does not exist, skip. The cache will be populated on next read
            // from PostgreSQL.
        }
    }

    private void incrementRefundInRedis(RefundRequested event) {
        try (Jedis jedis = jedisPool.getResource()) {
            String key = "order:" + event.orderId();
            if (jedis.exists(key)) {
                jedis.hincrByFloat(key, "refundedAmount", event.amount().doubleValue());
                jedis.expire(key, ttlSeconds);
            }
        }
    }
}

What the Implementation Reveals

The Redis projection does not participate in the PostgreSQL transaction that wraps position tracking. This means the projection can fail (Redis unavailable, network timeout) without affecting the position update. If the Redis write fails, the position still advances, and the event is not reprocessed.

This is acceptable because Redis is a cache, not a source of truth. A missing or stale cache entry results in a fallback read from PostgreSQL. The worst case is higher latency for some requests until the cache is repopulated.

The updateOrderFieldInRedis method checks if the key exists before updating. If the key has expired (TTL), the update is skipped. The cache will be populated on the next cache miss from the PostgreSQL read model. This prevents the projection from creating partial cache entries (a status update without the full order data).

The hincrByFloat operation for refund amounts is idempotent only if the event is processed exactly once. If the event is processed twice (possible during a Redis projection replay), the refund amount doubles. For a cache, this is tolerable because the cache entry will expire and be repopulated from PostgreSQL. For a source-of-truth store, this would be a bug.

The Production Path

// PRODUCTION
@Component
public class SpringRedisOrderProjection implements NamedProjection {

    private final StringRedisTemplate redisTemplate;
    private final EventTypeRegistry registry;

    @Value("${cache.order.ttl-seconds:3600}")
    private long ttlSeconds;

    public SpringRedisOrderProjection(StringRedisTemplate redisTemplate,
                                      EventTypeRegistry registry) {
        this.redisTemplate = redisTemplate;
        this.registry = registry;
    }

    @Override
    public String name() {
        return "redis-order-cache";
    }

    @Override
    public void process(StoredEvent stored) {
        OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());

        switch (event) {
            case OrderPlaced e -> {
                String key = "order:" + e.orderId();
                var ops = redisTemplate.opsForHash();
                ops.putAll(key, Map.of(
                    "orderId", e.orderId(),
                    "customerId", e.customerId(),
                    "status", "PLACED",
                    "total", e.total().toPlainString(),
                    "itemCount", String.valueOf(e.lineItems().size()),
                    "shippingCity", e.shippingAddress().city()
                ));
                redisTemplate.expire(key, Duration.ofSeconds(ttlSeconds));
            }
            case OrderConfirmed e ->
                updateField(e.orderId(), "status", "CONFIRMED");
            case PaymentAuthorized e ->
                updateField(e.orderId(), "status", "PAYMENT_AUTHORIZED");
            case OrderFulfilled e ->
                updateField(e.orderId(), "status", "FULFILLED");
            case OrderCancelled e ->
                updateField(e.orderId(), "status", "CANCELLED");
            default -> {} // Other events do not affect the cache
        }
    }

    private void updateField(String orderId, String field, String value) {
        String key = "order:" + orderId;
        if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
            redisTemplate.opsForHash().put(key, field, value);
            redisTemplate.expire(key, Duration.ofSeconds(ttlSeconds));
        }
    }
}

The Test

// FROM SCRATCH
@Testcontainers
class RedisProjectionTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
        .withDatabaseName("redis_proj_test")
        .withInitScript("projection_schema.sql");

    @Container
    static GenericContainer<?> redis = new GenericContainer<>("redis:7")
        .withExposedPorts(6379);

    private EventStore eventStore;
    private RedisOrderProjection redisProjection;
    private JedisPool jedisPool;

    @BeforeEach
    void setUp() {
        var ds = new org.postgresql.ds.PGSimpleDataSource();
        ds.setUrl(postgres.getJdbcUrl());
        ds.setUser(postgres.getUsername());
        ds.setPassword(postgres.getPassword());

        var mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

        eventStore = new EventStore(ds, mapper);
        var registry = new EventTypeRegistry(mapper);
        jedisPool = new JedisPool(redis.getHost(), redis.getFirstMappedPort());
        redisProjection = new RedisOrderProjection(jedisPool, registry, mapper, 3600);
    }

    @Test
    void orderPlacedPopulatesRedisCache() throws Exception {
        eventStore.append("order-redis-1", -1, List.of(
            new OrderPlaced("redis-1", "c1",
                List.of(new LineItem("p1", "Widget", 2, BigDecimal.TEN)),
                new BigDecimal("20.00"),
                new Address("1 St", "City", "ST", "00000", "US"),
                Instant.now())
        ));

        var events = eventStore.readAllFromPosition(0, 10);
        for (StoredEvent event : events) {
            redisProjection.process(event, null);
        }

        try (Jedis jedis = jedisPool.getResource()) {
            Map<String, String> cached = jedis.hgetAll("order:redis-1");
            assertEquals("PLACED", cached.get("status"));
            assertEquals("20.00", cached.get("total"));
            assertEquals("c1", cached.get("customerId"));
        }
    }

    @AfterEach
    void tearDown() {
        jedisPool.close();
    }
}

Elasticsearch Read Model

The Problem

The admin panel needs to search orders by customer name, product name, shipping city, or any combination of criteria. It needs to filter by status, sort by date, and paginate results. PostgreSQL can handle this with LIKE queries and composite indexes, but full-text search with relevance scoring, faceted search, and aggregations are Elasticsearch’s strengths.

The Mechanism

The Elasticsearch projection indexes each order as a document. The document includes all fields that the admin panel searches on. The Elasticsearch index is optimized for search (analyzed text fields, keyword fields for filtering, date fields for range queries) rather than for transactional updates.

The From-Scratch Implementation

// FROM SCRATCH
public class ElasticsearchOrderProjection implements EventProjection {

    private final RestClient restClient;
    private final EventTypeRegistry registry;
    private final ObjectMapper mapper;
    private static final String INDEX_NAME = "orders";

    public ElasticsearchOrderProjection(RestClient restClient,
                                        EventTypeRegistry registry,
                                        ObjectMapper mapper) {
        this.restClient = restClient;
        this.registry = registry;
        this.mapper = mapper;
    }

    @Override
    public void process(StoredEvent stored, Connection conn) throws Exception {
        OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());

        switch (event) {
            case OrderPlaced e -> indexOrder(e);
            case OrderConfirmed e -> updateOrderField(e.orderId(), "status", "CONFIRMED");
            case PaymentAuthorized e -> updateOrderField(e.orderId(), "status", "PAYMENT_AUTHORIZED");
            case OrderFulfilled e -> updateOrderField(e.orderId(), "status", "FULFILLED");
            case OrderCancelled e -> updateOrderField(e.orderId(), "status", "CANCELLED");
            case ShippingAddressChanged e -> updateAddress(e);
            case RefundRequested e -> updateRefund(e);
        }
    }

    private void indexOrder(OrderPlaced event) throws IOException {
        Map<String, Object> document = Map.of(
            "orderId", event.orderId(),
            "customerId", event.customerId(),
            "status", "PLACED",
            "total", event.total(),
            "itemCount", event.lineItems().size(),
            "products", event.lineItems().stream()
                .map(LineItem::productName)
                .toList(),
            "shippingCity", event.shippingAddress().city(),
            "shippingCountry", event.shippingAddress().country(),
            "placedAt", event.occurredAt().toString()
        );

        Request request = new Request("PUT", "/" + INDEX_NAME + "/_doc/" + event.orderId());
        request.setJsonEntity(mapper.writeValueAsString(document));
        restClient.performRequest(request);
    }

    private void updateOrderField(String orderId, String field, String value) throws IOException {
        Map<String, Object> update = Map.of(
            "doc", Map.of(field, value)
        );

        Request request = new Request("POST", "/" + INDEX_NAME + "/_update/" + orderId);
        request.setJsonEntity(mapper.writeValueAsString(update));
        restClient.performRequest(request);
    }

    private void updateAddress(ShippingAddressChanged event) throws IOException {
        Map<String, Object> update = Map.of(
            "doc", Map.of(
                "shippingCity", event.newAddress().city(),
                "shippingCountry", event.newAddress().country()
            )
        );

        Request request = new Request("POST", "/" + INDEX_NAME + "/_update/" + event.orderId());
        request.setJsonEntity(mapper.writeValueAsString(update));
        restClient.performRequest(request);
    }

    private void updateRefund(RefundRequested event) throws IOException {
        Map<String, Object> scriptUpdate = Map.of(
            "script", Map.of(
                "source", "ctx._source.refundedAmount = (ctx._source.refundedAmount ?: 0) + params.amount",
                "params", Map.of("amount", event.amount())
            )
        );

        Request request = new Request("POST", "/" + INDEX_NAME + "/_update/" + event.orderId());
        request.setJsonEntity(mapper.writeValueAsString(scriptUpdate));
        restClient.performRequest(request);
    }
}

What the Implementation Reveals

Elasticsearch’s _update API with _doc is idempotent for field replacements. Setting status to CONFIRMED twice produces the same result. The scripted update for refund amounts is not idempotent, for the same reason as the Redis increment: processing the event twice doubles the amount.

The Elasticsearch projection, like the Redis projection, does not participate in the PostgreSQL transaction. Elasticsearch failures do not roll back the position. If the Elasticsearch cluster is temporarily unavailable, the projection falls behind. When the cluster recovers, the projection catches up.

The fallback strategy for Elasticsearch unavailability is different from Redis. Redis is a cache with PostgreSQL as the fallback. Elasticsearch is the search backend. If Elasticsearch is down, search is unavailable. There is no graceful degradation to PostgreSQL LIKE queries, because the query interface and the result format are different. The admin panel shows a “search unavailable” message and the operations team is paged.

The Production Path

// PRODUCTION
@Component
public class SpringElasticsearchProjection implements NamedProjection {

    private final ElasticsearchClient esClient;
    private final EventTypeRegistry registry;

    public SpringElasticsearchProjection(ElasticsearchClient esClient,
                                          EventTypeRegistry registry) {
        this.esClient = esClient;
        this.registry = registry;
    }

    @Override
    public String name() {
        return "elasticsearch-orders";
    }

    @Override
    public void process(StoredEvent stored) {
        OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());

        try {
            switch (event) {
                case OrderPlaced e -> esClient.index(i -> i
                    .index("orders")
                    .id(e.orderId())
                    .document(toDocument(e))
                );
                case OrderConfirmed e -> updateStatus(e.orderId(), "CONFIRMED");
                case OrderFulfilled e -> updateStatus(e.orderId(), "FULFILLED");
                case OrderCancelled e -> updateStatus(e.orderId(), "CANCELLED");
                default -> {} // Other events handled as needed
            }
        } catch (IOException e) {
            throw new ProjectionException("Elasticsearch projection failed", e);
        }
    }

    private void updateStatus(String orderId, String status) throws IOException {
        esClient.update(u -> u
            .index("orders")
            .id(orderId)
            .doc(Map.of("status", status)),
            Map.class
        );
    }

    private Map<String, Object> toDocument(OrderPlaced e) {
        return Map.of(
            "orderId", e.orderId(),
            "customerId", e.customerId(),
            "status", "PLACED",
            "total", e.total(),
            "products", e.lineItems().stream().map(LineItem::productName).toList(),
            "shippingCity", e.shippingAddress().city(),
            "placedAt", e.occurredAt().toString()
        );
    }
}

The Test

// FROM SCRATCH
@Testcontainers
class ElasticsearchProjectionTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
        .withDatabaseName("es_proj_test")
        .withInitScript("projection_schema.sql");

    @Container
    static ElasticsearchContainer elasticsearch =
        new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.11.0")
            .withEnv("xpack.security.enabled", "false")
            .withEnv("discovery.type", "single-node");

    private EventStore eventStore;
    private ElasticsearchOrderProjection esProjection;
    private RestClient restClient;

    @BeforeEach
    void setUp() {
        var ds = new org.postgresql.ds.PGSimpleDataSource();
        ds.setUrl(postgres.getJdbcUrl());
        ds.setUser(postgres.getUsername());
        ds.setPassword(postgres.getPassword());

        var mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        eventStore = new EventStore(ds, mapper);
        var registry = new EventTypeRegistry(mapper);
        restClient = RestClient.builder(
            HttpHost.create(elasticsearch.getHttpHostAddress())
        ).build();
        esProjection = new ElasticsearchOrderProjection(restClient, registry, mapper);
    }

    @Test
    void orderIndexedInElasticsearch() throws Exception {
        eventStore.append("order-es-1", -1, List.of(
            new OrderPlaced("es-1", "c1",
                List.of(new LineItem("p1", "Premium Widget", 1, BigDecimal.TEN)),
                BigDecimal.TEN,
                new Address("1 St", "Portland", "OR", "97201", "US"),
                Instant.now())
        ));

        var events = eventStore.readAllFromPosition(0, 10);
        for (StoredEvent event : events) {
            esProjection.process(event, null);
        }

        // Refresh to make documents searchable
        restClient.performRequest(new Request("POST", "/orders/_refresh"));

        // Search by city
        Request searchRequest = new Request("GET", "/orders/_search");
        searchRequest.setJsonEntity("""
            {"query": {"match": {"shippingCity": "Portland"}}}
            """);
        var response = restClient.performRequest(searchRequest);
        String body = new String(response.getEntity().getContent().readAllBytes());
        assertTrue(body.contains("es-1"));
    }

    @AfterEach
    void tearDown() throws IOException {
        restClient.close();
    }
}

Consistency Trade-offs Summary

TargetTransaction with PositionIdempotent UpdatesFailure Impact
PostgreSQLYes (same database)Natural for upsertsRead model stale until recovery
RedisNoNatural for field sets, not for incrementsCache miss, fallback to PostgreSQL
ElasticsearchNoNatural for document puts, not for scriptsSearch unavailable

The PostgreSQL read model is the most operationally reliable because it shares a transactional boundary with position tracking. Redis and Elasticsearch projections are best-effort: they are usually consistent but can fall behind or show stale data during failures. Design the system so that the PostgreSQL read model is the authoritative fallback. Redis and Elasticsearch are performance optimizations, not sources of truth.

This chapter extended projections to multiple targets. The next chapter covers the most disruptive projection operation: rebuilding from scratch.