Observability: Metrics, Dashboards, and Alerting for Decoupled Write and Read Sides
Observability
An event-sourced system’s Grafana dashboard has five rows: system health overview (single-stat panels for total events, projection lag, stuck sagas, outbox pending), write-side metrics (append rate, latency percentiles, concurrency failures), read-side metrics (projection lag over time, processing rate, errors), saga health, and infrastructure metrics. Projection lag is the single most important metric.
In a CRUD system, monitoring focuses on request latency, error rates, and database connection pools. The read and write sides are the same database, so a single set of metrics covers both. In an event-sourced system with CQRS, the write side and read side are decoupled. Events flow from the command side through the event store, through the outbox, through Kafka, through projections, into read models. Each stage can lag, fail, or degrade independently. Standard CRUD monitoring misses most failure modes.
The critical question in an event-sourced system is not “is the API responding?” but “how far behind are the projections?” A projection that is 30 seconds behind the event store means customers see stale data. A projection that is 5 minutes behind means the system is functionally broken. A saga that has been stuck in AWAITING_PAYMENT for 10 minutes means an order is in limbo.
The Metrics That Matter
Write-Side Metrics
// FROM SCRATCH
public class InstrumentedEventStore {
private final EventStore delegate;
private final MeterRegistry registry;
private final Counter eventsAppended;
private final Counter concurrencyFailures;
private final Timer appendLatency;
public InstrumentedEventStore(EventStore delegate, MeterRegistry registry) {
this.delegate = delegate;
this.registry = registry;
this.eventsAppended = Counter.builder("eventstore.events.appended")
.description("Total events appended to the event store")
.register(registry);
this.concurrencyFailures = Counter.builder("eventstore.concurrency.failures")
.description("Optimistic concurrency failures")
.register(registry);
this.appendLatency = Timer.builder("eventstore.append.latency")
.description("Time to append events to the store")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
}
public void append(String streamId, long expectedVersion,
List<EventData> events) {
try {
appendLatency.record(() -> {
delegate.append(streamId, expectedVersion, events);
eventsAppended.increment(events.size());
});
} catch (OptimisticConcurrencyException e) {
concurrencyFailures.increment();
throw e;
}
}
public void appendWithTag(String streamId, long expectedVersion,
List<EventData> events, String aggregateType) {
Timer taggedTimer = Timer.builder("eventstore.append.latency")
.tag("aggregate", aggregateType)
.register(registry);
Counter taggedCounter = Counter.builder("eventstore.events.appended")
.tag("aggregate", aggregateType)
.register(registry);
try {
taggedTimer.record(() -> {
delegate.append(streamId, expectedVersion, events);
taggedCounter.increment(events.size());
});
} catch (OptimisticConcurrencyException e) {
concurrencyFailures.increment();
throw e;
}
}
}
What these metrics reveal:
eventstore.events.appendedrate shows write throughput. A sudden drop indicates upstream issues (fewer commands) or downstream issues (event store is slow).eventstore.concurrency.failuresrate indicates contention. A spike means multiple commands are targeting the same aggregate simultaneously. This might mean the aggregate boundaries are wrong (Chapter 4) or the system needs a command queue.eventstore.append.latencyp99 shows worst-case write performance. If p99 exceeds 100ms, investigate PostgreSQL performance: vacuum, index bloat, connection pool exhaustion.
Read-Side Metrics
// FROM SCRATCH
public class InstrumentedProjectionEngine {
private final ProjectionEngine delegate;
private final MeterRegistry registry;
private final DataSource dataSource;
private final Gauge projectionLag;
private final Counter eventsProcessed;
private final Counter processingErrors;
private final Timer processingLatency;
public InstrumentedProjectionEngine(ProjectionEngine delegate,
MeterRegistry registry,
DataSource dataSource) {
this.delegate = delegate;
this.registry = registry;
this.dataSource = dataSource;
this.eventsProcessed = Counter.builder("projection.events.processed")
.description("Events processed by projections")
.register(registry);
this.processingErrors = Counter.builder("projection.events.errors")
.description("Projection processing errors")
.register(registry);
this.processingLatency = Timer.builder("projection.processing.latency")
.description("Time to process a single event in a projection")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
// Gauge that continuously reports projection lag
this.projectionLag = Gauge.builder("projection.lag.events", this::measureLag)
.description("Number of events the projection is behind")
.register(registry);
}
private double measureLag() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("""
SELECT
(SELECT COALESCE(MAX(global_position), 0) FROM event_store) -
(SELECT COALESCE(last_processed_position, 0) FROM projection_positions
WHERE projection_name = 'OrderSummary')
AS lag
""")) {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return rs.getDouble("lag");
}
} catch (SQLException e) {
return -1; // Indicates measurement failure
}
}
public void processEvent(StoredEvent event, EventProjection projection) {
try {
processingLatency.record(() -> {
projection.process(event);
eventsProcessed.increment();
});
} catch (Exception e) {
processingErrors.increment();
throw e;
}
}
}
What these metrics reveal:
projection.lag.eventsis the single most important metric in an event-sourced system. It tells you how far behind the read side is from the write side. Zero lag means the system is consistent. Non-zero lag means the system is eventually consistent, and the “eventually” has a measurable duration.projection.events.processedrate shows read-side throughput. If this rate drops while the write-side rate stays constant, the projection is falling behind.projection.events.errorsindicates broken projection handlers. Any non-zero error rate requires investigation.
Saga Metrics
// FROM SCRATCH
public class SagaMetrics {
private final DataSource dataSource;
private final MeterRegistry registry;
public SagaMetrics(DataSource dataSource, MeterRegistry registry) {
this.dataSource = dataSource;
this.registry = registry;
// Register gauges for saga states
for (OrderSagaState state : OrderSagaState.values()) {
Gauge.builder("saga.instances", () -> countSagasInState(state))
.tag("state", state.name())
.description("Number of sagas in this state")
.register(registry);
}
// Gauge for stuck sagas (not in terminal state, not updated recently)
Gauge.builder("saga.stuck", this::countStuckSagas)
.description("Sagas stuck for more than 10 minutes")
.register(registry);
}
private double countSagasInState(OrderSagaState state) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT COUNT(*) FROM saga_state WHERE saga_type = 'OrderFulfilment' AND state = ?")) {
stmt.setString(1, state.name());
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return rs.getDouble(1);
}
} catch (SQLException e) {
return -1;
}
}
private double countStuckSagas() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("""
SELECT COUNT(*) FROM saga_state
WHERE state NOT IN ('COMPLETED', 'FAILED')
AND updated_at < NOW() - INTERVAL '10 minutes'
""")) {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return rs.getDouble(1);
}
} catch (SQLException e) {
return -1;
}
}
}
Outbox and Kafka Metrics
// FROM SCRATCH
public class OutboxMetrics {
private final DataSource dataSource;
private final MeterRegistry registry;
public OutboxMetrics(DataSource dataSource, MeterRegistry registry) {
this.dataSource = dataSource;
this.registry = registry;
Gauge.builder("outbox.pending", this::countPendingMessages)
.description("Messages waiting in the outbox")
.register(registry);
Gauge.builder("outbox.oldest.age.seconds", this::oldestMessageAge)
.description("Age of the oldest pending outbox message in seconds")
.register(registry);
}
private double countPendingMessages() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT COUNT(*) FROM outbox WHERE processed = FALSE")) {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return rs.getDouble(1);
}
} catch (SQLException e) {
return -1;
}
}
private double oldestMessageAge() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("""
SELECT EXTRACT(EPOCH FROM NOW() - MIN(created_at))
FROM outbox WHERE processed = FALSE
""")) {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
double age = rs.getDouble(1);
return rs.wasNull() ? 0 : age;
}
} catch (SQLException e) {
return -1;
}
}
}
The Production Path
Spring Boot Actuator Configuration
// PRODUCTION
@Configuration
public class ObservabilityConfig {
@Bean
public InstrumentedEventStore instrumentedEventStore(EventStore eventStore,
MeterRegistry registry) {
return new InstrumentedEventStore(eventStore, registry);
}
@Bean
public SagaMetrics sagaMetrics(DataSource dataSource, MeterRegistry registry) {
return new SagaMetrics(dataSource, registry);
}
@Bean
public OutboxMetrics outboxMetrics(DataSource dataSource, MeterRegistry registry) {
return new OutboxMetrics(dataSource, registry);
}
}
# application.yml
management:
endpoints:
web:
exposure:
include: health,prometheus,info
metrics:
export:
prometheus:
enabled: true
distribution:
percentiles-histogram:
eventstore.append.latency: true
projection.processing.latency: true
tags:
application: order-service
Grafana Dashboard Layout
A single Grafana dashboard for an event-sourced system should have five rows:
Row 1: System Health Overview. Four single-stat panels: total events (counter), projection lag (gauge, green/yellow/red thresholds), stuck sagas (gauge), outbox pending (gauge). This row answers “is the system healthy?” in one glance.
Row 2: Write Side. Three panels: events appended per second (time series), append latency percentiles (time series with p50/p95/p99), concurrency failures per minute (time series). This row shows write-side throughput and performance.
Row 3: Read Side. Three panels: projection lag over time (time series, the most watched panel), events processed per second (time series), projection errors per minute (time series). This row shows read-side health and the gap between write and read.
Row 4: Sagas and Workflows. Two panels: saga instances by state (stacked area chart), stuck sagas over time (time series with alert threshold). This row shows workflow health.
Row 5: Infrastructure. Three panels: outbox pending messages (time series), outbox oldest message age (time series), event store size growth (time series from PostgreSQL stats). This row shows infrastructure health.
Alerting Rules
# prometheus-alerts.yml
groups:
- name: event-sourcing
rules:
- alert: ProjectionLagHigh
expr: projection_lag_events > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Projection lag exceeds 1000 events"
description: "The projection is {{ $value }} events behind the event store."
- alert: ProjectionLagCritical
expr: projection_lag_events > 10000
for: 2m
labels:
severity: critical
annotations:
summary: "Projection lag exceeds 10000 events"
description: "The projection is critically behind. Read models may show significantly stale data."
- alert: SagaStuck
expr: saga_stuck > 0
for: 15m
labels:
severity: warning
annotations:
summary: "{{ $value }} sagas are stuck"
description: "Sagas have not progressed for more than 10 minutes. Check saga timeout handling."
- alert: OutboxBacklog
expr: outbox_pending > 500
for: 5m
labels:
severity: warning
annotations:
summary: "Outbox has {{ $value }} pending messages"
description: "The outbox relay may be down or slow."
- alert: OutboxStale
expr: outbox_oldest_age_seconds > 300
for: 1m
labels:
severity: critical
annotations:
summary: "Oldest outbox message is {{ $value }}s old"
description: "Messages are not being relayed. Events are not reaching consumers."
- alert: HighConcurrencyFailureRate
expr: rate(eventstore_concurrency_failures_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High optimistic concurrency failure rate"
description: "More than 10 concurrency failures per second. Check aggregate boundaries."
- alert: EventStoreGrowthAnomaly
expr: deriv(eventstore_events_appended_total[1h]) > 2 * deriv(eventstore_events_appended_total[24h])
for: 30m
labels:
severity: warning
annotations:
summary: "Event store growth rate anomaly"
description: "Current growth rate is more than 2x the 24-hour average."
What the Implementation Reveals
The projection lag metric deserves special attention. In a CRUD system, staleness is not measurable. The read and write happen against the same table. In CQRS, staleness is a number. You can set an SLA: “projections must be within 5 seconds of the event store.” You can measure compliance. You can alert on violations.
The outbox metrics bridge the write side and the messaging layer. If the outbox relay stops, events continue to be appended to the event store (write side works) but projections stop receiving events (read side stalls). The outbox pending count and oldest message age are the canary metrics that catch this failure before it affects users.
Saga metrics catch a class of failures that are invisible to request-level monitoring. The API responds correctly (200 OK for placing an order) but the workflow never completes because the saga is stuck waiting for an event that never arrives. Without saga metrics, these failures are discovered by customers calling support.
The Test
// FROM SCRATCH
class InstrumentedEventStoreTest {
@Test
void incrementsCounterOnAppend() {
var registry = new SimpleMeterRegistry();
var delegate = new InMemoryEventStore();
var instrumented = new InstrumentedEventStore(delegate, registry);
instrumented.append("stream-1", 0, List.of(
new EventData("OrderPlaced", "{}", Instant.now())
));
Counter counter = registry.find("eventstore.events.appended").counter();
assertNotNull(counter);
assertEquals(1.0, counter.count());
}
@Test
void incrementsConcurrencyFailureCounter() {
var registry = new SimpleMeterRegistry();
var delegate = new FailingEventStore(); // Always throws OptimisticConcurrencyException
var instrumented = new InstrumentedEventStore(delegate, registry);
assertThrows(OptimisticConcurrencyException.class, () ->
instrumented.append("stream-1", 0, List.of(
new EventData("OrderPlaced", "{}", Instant.now())
))
);
Counter counter = registry.find("eventstore.concurrency.failures").counter();
assertNotNull(counter);
assertEquals(1.0, counter.count());
}
@Test
void recordsAppendLatency() {
var registry = new SimpleMeterRegistry();
var delegate = new InMemoryEventStore();
var instrumented = new InstrumentedEventStore(delegate, registry);
instrumented.append("stream-1", 0, List.of(
new EventData("OrderPlaced", "{}", Instant.now())
));
Timer timer = registry.find("eventstore.append.latency").timer();
assertNotNull(timer);
assertEquals(1, timer.count());
assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) >= 0);
}
}
Observability for event-sourced systems requires more metrics than CRUD systems because there are more moving parts. The write side, the event store, the outbox, the message broker, the projections, and the sagas each have their own failure modes. Each needs its own metrics. The investment pays off when you can answer “why is the customer seeing stale data?” in 30 seconds by looking at the projection lag dashboard.
This chapter completes Part V on operations. Part VI addresses the strategic decisions: when event sourcing is the wrong choice, and how to migrate from CRUD when it is the right one.