Process Managers and Sagas: Coordinating Multi-Step Workflows Across Aggregates
Process Managers and Sagas
The order fulfilment saga is a state machine with eight states. The happy path flows left to right: STARTED through AWAITING_PAYMENT, AWAITING_INVENTORY, AWAITING_FULFILMENT, to COMPLETED. Failures at any step trigger the compensation path, which undoes previous steps in reverse order before reaching the FAILED terminal state.
Placing an order is not a single operation. It is a workflow. The order is created. Payment is authorized. Inventory is reserved. Fulfilment is dispatched. Each step involves a different aggregate, potentially in a different service. Each step can fail. When payment authorization fails after the order is created, the order must be cancelled. When inventory reservation fails after payment is authorized, the payment must be released and the order must be cancelled. Each failure requires compensating actions that undo the effects of previous steps.
In a monolithic CRUD system, this workflow might be a single database transaction that locks multiple tables. In an event-sourced system with separate aggregates, each aggregate has its own event stream and its own transactional boundary. There is no shared transaction. The coordination must happen through events and commands.
A saga (or process manager) is a stateful event consumer that listens for events and dispatches commands. It tracks the state of a multi-step workflow and handles both the happy path and the compensation path. The saga is the central coordinator for the workflow.
Orchestration vs Choreography
Two approaches exist for coordinating multi-step workflows.
Choreography. Each service listens for events and reacts independently. The payment service listens for OrderPlaced and authorizes payment. The inventory service listens for PaymentAuthorized and reserves stock. The fulfilment service listens for InventoryReserved and dispatches shipment. There is no central coordinator. Each service knows its immediate predecessor and its immediate successor.
Choreography works for linear workflows with few steps. It becomes unmanageable when the workflow has conditional branches, parallel steps, or complex compensation logic. If inventory reservation fails, which service is responsible for releasing the payment? The payment service does not listen for InventoryReservationFailed. Someone must add that listener. As the workflow becomes more complex, the event dependencies become a hidden distributed workflow that no single piece of code describes.
Orchestration. A central saga coordinates the workflow. It listens for events, tracks state, and dispatches commands. The saga knows the complete workflow: which steps to execute, in which order, and what to do when a step fails. The workflow is visible in a single class.
Orchestration adds a single point of coordination (the saga), but the workflow logic is explicit, testable, and modifiable in one place. For the order management platform, where the workflow involves four bounded contexts and conditional compensation, orchestration is the better choice.
The Order Fulfilment Saga
The Problem
The order fulfilment workflow involves four steps across four aggregates:
- Order is placed (Order aggregate).
- Payment is authorized (Payment aggregate).
- Inventory is reserved (Inventory aggregate).
- Fulfilment is dispatched (Fulfilment aggregate).
If payment fails, the order must be cancelled. If inventory fails, the payment must be released and the order must be cancelled. If fulfilment fails, the inventory must be released, the payment must be released, and the order must be cancelled. Each compensation step is itself a command that produces events.
The Mechanism
The saga is implemented as a state machine. Each state represents a step in the workflow. Transitions are triggered by events. Some transitions dispatch commands. Some transitions trigger compensation.
// FROM SCRATCH
public enum OrderSagaState {
STARTED,
AWAITING_PAYMENT,
AWAITING_INVENTORY,
AWAITING_FULFILMENT,
COMPLETED,
COMPENSATING_INVENTORY,
COMPENSATING_PAYMENT,
FAILED
}
The From-Scratch Implementation
// FROM SCRATCH
public class OrderFulfilmentSaga {
private String sagaId;
private String orderId;
private OrderSagaState state;
private String failureReason;
public OrderFulfilmentSaga(String sagaId, String orderId) {
this.sagaId = sagaId;
this.orderId = orderId;
this.state = OrderSagaState.STARTED;
}
public List<SagaCommand> handle(Object event) {
return switch (state) {
case STARTED -> handleStarted(event);
case AWAITING_PAYMENT -> handleAwaitingPayment(event);
case AWAITING_INVENTORY -> handleAwaitingInventory(event);
case AWAITING_FULFILMENT -> handleAwaitingFulfilment(event);
case COMPENSATING_INVENTORY -> handleCompensatingInventory(event);
case COMPENSATING_PAYMENT -> handleCompensatingPayment(event);
case COMPLETED, FAILED -> List.of(); // Terminal states
};
}
private List<SagaCommand> handleStarted(Object event) {
if (event instanceof OrderPlaced placed) {
this.state = OrderSagaState.AWAITING_PAYMENT;
return List.of(new AuthorizePaymentCommand(
orderId, placed.total(), placed.customerId()
));
}
return List.of();
}
private List<SagaCommand> handleAwaitingPayment(Object event) {
if (event instanceof PaymentAuthorized) {
this.state = OrderSagaState.AWAITING_INVENTORY;
return List.of(new ReserveInventoryCommand(orderId));
}
if (event instanceof PaymentFailed failed) {
this.failureReason = failed.reason();
this.state = OrderSagaState.FAILED;
return List.of(new CancelOrderCommand(orderId, "Payment failed: " + failed.reason()));
}
return List.of();
}
private List<SagaCommand> handleAwaitingInventory(Object event) {
if (event instanceof InventoryReserved) {
this.state = OrderSagaState.AWAITING_FULFILMENT;
return List.of(new DispatchFulfilmentCommand(orderId));
}
if (event instanceof InventoryReservationFailed failed) {
this.failureReason = failed.reason();
this.state = OrderSagaState.COMPENSATING_PAYMENT;
return List.of(new ReleasePaymentCommand(orderId));
}
return List.of();
}
private List<SagaCommand> handleAwaitingFulfilment(Object event) {
if (event instanceof FulfilmentDispatched) {
this.state = OrderSagaState.COMPLETED;
return List.of();
}
if (event instanceof FulfilmentFailed failed) {
this.failureReason = failed.reason();
this.state = OrderSagaState.COMPENSATING_INVENTORY;
return List.of(new ReleaseInventoryCommand(orderId));
}
return List.of();
}
private List<SagaCommand> handleCompensatingInventory(Object event) {
if (event instanceof InventoryReleased) {
this.state = OrderSagaState.COMPENSATING_PAYMENT;
return List.of(new ReleasePaymentCommand(orderId));
}
return List.of();
}
private List<SagaCommand> handleCompensatingPayment(Object event) {
if (event instanceof PaymentReleased) {
this.state = OrderSagaState.FAILED;
return List.of(new CancelOrderCommand(orderId, "Fulfilment workflow failed: " + failureReason));
}
return List.of();
}
public String sagaId() { return sagaId; }
public String orderId() { return orderId; }
public OrderSagaState state() { return state; }
}
// Saga commands
public sealed interface SagaCommand {}
public record AuthorizePaymentCommand(String orderId, BigDecimal amount, String customerId) implements SagaCommand {}
public record ReserveInventoryCommand(String orderId) implements SagaCommand {}
public record DispatchFulfilmentCommand(String orderId) implements SagaCommand {}
public record ReleasePaymentCommand(String orderId) implements SagaCommand {}
public record ReleaseInventoryCommand(String orderId) implements SagaCommand {}
public record CancelOrderCommand(String orderId, String reason) implements SagaCommand {}
// External service events
public record PaymentFailed(String orderId, String reason) {}
public record InventoryReserved(String orderId, List<String> reservedItems) {}
public record InventoryReservationFailed(String orderId, String reason) {}
public record FulfilmentDispatched(String orderId, String trackingNumber) {}
public record FulfilmentFailed(String orderId, String reason) {}
public record InventoryReleased(String orderId) {}
public record PaymentReleased(String orderId) {}
Saga State Persistence
The saga’s state must survive process restarts. If the saga is in the AWAITING_INVENTORY state when the process crashes, it must resume in that state when the process restarts.
-- FROM SCRATCH
CREATE TABLE saga_state (
saga_id VARCHAR(255) PRIMARY KEY,
saga_type VARCHAR(255) NOT NULL,
order_id VARCHAR(255) NOT NULL,
state VARCHAR(50) NOT NULL,
failure_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_saga_state_order ON saga_state (order_id);
CREATE INDEX idx_saga_state_type ON saga_state (saga_type, state);
// FROM SCRATCH
public class SagaRepository {
private final DataSource dataSource;
public SagaRepository(DataSource dataSource) {
this.dataSource = dataSource;
}
public void save(OrderFulfilmentSaga saga) {
String sql = """
INSERT INTO saga_state (saga_id, saga_type, order_id, state, failure_reason, updated_at)
VALUES (?, 'OrderFulfilment', ?, ?, ?, NOW())
ON CONFLICT (saga_id) DO UPDATE SET
state = EXCLUDED.state,
failure_reason = EXCLUDED.failure_reason,
updated_at = NOW()
""";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, saga.sagaId());
stmt.setString(2, saga.orderId());
stmt.setString(3, saga.state().name());
stmt.setString(4, null);
stmt.executeUpdate();
} catch (SQLException e) {
throw new SagaException("Failed to save saga " + saga.sagaId(), e);
}
}
public Optional<OrderFulfilmentSaga> findByOrderId(String orderId) {
String sql = """
SELECT saga_id, order_id, state, failure_reason
FROM saga_state
WHERE order_id = ? AND saga_type = 'OrderFulfilment'
""";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, orderId);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
var saga = new OrderFulfilmentSaga(
rs.getString("saga_id"),
rs.getString("order_id")
);
saga.restoreState(OrderSagaState.valueOf(rs.getString("state")));
return Optional.of(saga);
}
return Optional.empty();
}
} catch (SQLException e) {
throw new SagaException("Failed to load saga for order " + orderId, e);
}
}
}
What the Implementation Reveals
The saga is a pure state machine. It receives an event, transitions to a new state, and produces zero or more commands. It does not call services directly. It does not access databases. The saga’s only job is to make decisions based on events and produce commands. This makes the saga testable without infrastructure: feed it events, assert on produced commands.
The compensation path mirrors the happy path in reverse. If fulfilment fails after inventory was reserved and payment was authorized, the saga compensates inventory first, then payment, then cancels the order. The ordering matters: releasing payment before releasing inventory might allow the system to accept new orders for the same inventory that is still reserved.
Timeout handling is critical. What if the payment service never responds? The saga is stuck in AWAITING_PAYMENT forever. A timeout mechanism must detect stale sagas and trigger compensation.
// FROM SCRATCH
public class SagaTimeoutChecker {
private final DataSource dataSource;
private final Duration timeout;
public SagaTimeoutChecker(DataSource dataSource, Duration timeout) {
this.dataSource = dataSource;
this.timeout = timeout;
}
public List<String> findTimedOutSagas() {
String sql = """
SELECT saga_id FROM saga_state
WHERE state NOT IN ('COMPLETED', 'FAILED')
AND updated_at < NOW() - INTERVAL '1 second' * ?
""";
List<String> timedOut = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, timeout.toSeconds());
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
timedOut.add(rs.getString("saga_id"));
}
}
} catch (SQLException e) {
throw new SagaException("Failed to check for timed out sagas", e);
}
return timedOut;
}
}
The Production Path
// PRODUCTION
@Component
public class SpringSagaManager {
private final JdbcTemplate jdbc;
private final ApplicationEventPublisher publisher;
public SpringSagaManager(JdbcTemplate jdbc, ApplicationEventPublisher publisher) {
this.jdbc = jdbc;
this.publisher = publisher;
}
@KafkaListener(topics = "order-events", groupId = "saga-manager")
@Transactional
public void onEvent(ConsumerRecord<String, String> record) {
String orderId = record.key();
String eventType = new String(record.headers().lastHeader("eventType").value());
OrderFulfilmentSaga saga = loadOrCreateSaga(orderId, eventType);
if (saga == null) return;
Object event = deserializeEvent(eventType, record.value());
List<SagaCommand> commands = saga.handle(event);
saveSaga(saga);
for (SagaCommand command : commands) {
dispatchCommand(command);
}
}
private void dispatchCommand(SagaCommand command) {
// Route command to the appropriate service
switch (command) {
case AuthorizePaymentCommand c -> publisher.publishEvent(c);
case ReserveInventoryCommand c -> publisher.publishEvent(c);
case DispatchFulfilmentCommand c -> publisher.publishEvent(c);
case ReleasePaymentCommand c -> publisher.publishEvent(c);
case ReleaseInventoryCommand c -> publisher.publishEvent(c);
case CancelOrderCommand c -> publisher.publishEvent(c);
}
}
private OrderFulfilmentSaga loadOrCreateSaga(String orderId, String eventType) {
var sagas = jdbc.query(
"SELECT saga_id, order_id, state FROM saga_state WHERE order_id = ? AND saga_type = 'OrderFulfilment'",
(rs, rowNum) -> {
var s = new OrderFulfilmentSaga(rs.getString("saga_id"), rs.getString("order_id"));
s.restoreState(OrderSagaState.valueOf(rs.getString("state")));
return s;
},
orderId
);
if (!sagas.isEmpty()) return sagas.get(0);
if ("OrderPlaced".equals(eventType)) {
return new OrderFulfilmentSaga(UUID.randomUUID().toString(), orderId);
}
return null;
}
private void saveSaga(OrderFulfilmentSaga saga) {
jdbc.update(
"""
INSERT INTO saga_state (saga_id, saga_type, order_id, state, updated_at)
VALUES (?, 'OrderFulfilment', ?, ?, NOW())
ON CONFLICT (saga_id) DO UPDATE SET state = ?, updated_at = NOW()
""",
saga.sagaId(), saga.orderId(), saga.state().name(), saga.state().name()
);
}
private Object deserializeEvent(String type, String payload) {
// Deserialize based on event type
return null; // Implementation depends on event registry
}
}
In Axon Framework, sagas are annotated with @Saga and event handlers with @SagaEventHandler. Axon manages the saga lifecycle, state persistence, and event routing. The @StartSaga annotation creates a new saga instance when the triggering event arrives. The @EndSaga annotation marks the saga as completed. Axon provides deadline management for timeouts via @DeadlineHandler.
The Test
// FROM SCRATCH
class OrderFulfilmentSagaTest {
@Test
void happyPath() {
var saga = new OrderFulfilmentSaga("saga-1", "order-1");
// OrderPlaced -> AuthorizePayment
var commands = saga.handle(new OrderPlaced(
"order-1", "c1", List.of(), new BigDecimal("100"),
new Address("1", "C", "S", "0", "US"), Instant.now()
));
assertEquals(1, commands.size());
assertInstanceOf(AuthorizePaymentCommand.class, commands.get(0));
assertEquals(OrderSagaState.AWAITING_PAYMENT, saga.state());
// PaymentAuthorized -> ReserveInventory
commands = saga.handle(new PaymentAuthorized(
"order-1", "pay-1", new BigDecimal("100"), "AUTH", Instant.now()
));
assertEquals(1, commands.size());
assertInstanceOf(ReserveInventoryCommand.class, commands.get(0));
// InventoryReserved -> DispatchFulfilment
commands = saga.handle(new InventoryReserved("order-1", List.of("item-1")));
assertEquals(1, commands.size());
assertInstanceOf(DispatchFulfilmentCommand.class, commands.get(0));
// FulfilmentDispatched -> Completed
commands = saga.handle(new FulfilmentDispatched("order-1", "TRACK-001"));
assertTrue(commands.isEmpty());
assertEquals(OrderSagaState.COMPLETED, saga.state());
}
@Test
void inventoryFailureCompensatesPaymentAndCancelsOrder() {
var saga = new OrderFulfilmentSaga("saga-2", "order-2");
saga.handle(new OrderPlaced("order-2", "c1", List.of(), BigDecimal.TEN,
new Address("1", "C", "S", "0", "US"), Instant.now()));
saga.handle(new PaymentAuthorized("order-2", "pay-2", BigDecimal.TEN, "AUTH", Instant.now()));
// Inventory fails
var commands = saga.handle(new InventoryReservationFailed("order-2", "Out of stock"));
assertEquals(1, commands.size());
assertInstanceOf(ReleasePaymentCommand.class, commands.get(0));
// Payment released -> cancel order
commands = saga.handle(new PaymentReleased("order-2"));
assertEquals(1, commands.size());
assertInstanceOf(CancelOrderCommand.class, commands.get(0));
assertEquals(OrderSagaState.FAILED, saga.state());
}
@Test
void fulfilmentFailureCompensatesInventoryThenPayment() {
var saga = new OrderFulfilmentSaga("saga-3", "order-3");
saga.handle(new OrderPlaced("order-3", "c1", List.of(), BigDecimal.TEN,
new Address("1", "C", "S", "0", "US"), Instant.now()));
saga.handle(new PaymentAuthorized("order-3", "pay-3", BigDecimal.TEN, "AUTH", Instant.now()));
saga.handle(new InventoryReserved("order-3", List.of("item-1")));
// Fulfilment fails
var commands = saga.handle(new FulfilmentFailed("order-3", "Warehouse fire"));
assertEquals(1, commands.size());
assertInstanceOf(ReleaseInventoryCommand.class, commands.get(0));
// Inventory released -> release payment
commands = saga.handle(new InventoryReleased("order-3"));
assertEquals(1, commands.size());
assertInstanceOf(ReleasePaymentCommand.class, commands.get(0));
// Payment released -> cancel order
commands = saga.handle(new PaymentReleased("order-3"));
assertEquals(1, commands.size());
assertInstanceOf(CancelOrderCommand.class, commands.get(0));
assertEquals(OrderSagaState.FAILED, saga.state());
}
}
The saga test is a pure unit test. No database. No Kafka. No containers. The saga is a state machine that receives events and produces commands. The test feeds events and asserts on commands. This is one of the benefits of the orchestration approach: the entire workflow logic is testable in isolation.
This chapter established sagas as the coordination mechanism for multi-step workflows. The next chapter addresses what happens when event schemas change over time.