Publishing Events: Kafka, the Outbox Pattern, and Guaranteed Delivery Without Dual Writes
Publishing Events
The outbox pattern solves the dual-write problem by writing events to both the event store and an outbox table in the same database transaction. A separate relay process polls the outbox, publishes messages to Kafka, and marks them as processed. This guarantees at-least-once delivery without distributed transactions. Consumers must be idempotent.
The event store captures events for the write side: aggregate loading, concurrency control, and internal projections. But events also need to reach external consumers: other microservices, analytics pipelines, notification systems. Kafka is the message broker that carries events beyond the boundary of the originating service.
The engineering challenge is bridging the event store (PostgreSQL) and Kafka without introducing inconsistencies. This bridge has a name, the transactional outbox pattern, and it is the only correct approach for external event publishing. This is not a recommendation. It is a constraint.
The Dual Write Problem
The Problem
The naive approach: after writing events to the event store, publish them to Kafka.
// NAIVE - Dual write
@Transactional
public void handleCommand(PlaceOrder command) {
List<OrderEvent> events = aggregate.place(command);
eventStore.append("order-" + command.orderId(), version, events);
// Transaction commits here
// Kafka publish is outside the transaction
for (OrderEvent event : events) {
kafkaTemplate.send("order-events", event.orderId(), event);
}
}
Two failure modes exist:
PostgreSQL commits, Kafka fails. The event is in the event store but not in Kafka. Internal projections process the event. External consumers never receive it. The payment service never authorizes the payment. The inventory service never reserves stock. The order is placed but never progresses.
Kafka publishes, PostgreSQL rolls back. The event is in Kafka but not in the event store. External consumers process an event that never happened. The payment service authorizes a payment for a nonexistent order. The inventory service reserves stock that was never ordered.
Both failure modes produce data inconsistency between services. The inconsistency is silent. No error is thrown. No alert is triggered. The system continues operating with divergent state across services until a human notices.
The Mechanism
The transactional outbox eliminates dual writes by writing events to Kafka indirectly. Instead of publishing to Kafka after the transaction commits, the events are written to an outbox table within the same transaction as the event store write. A separate process (the outbox relay) reads the outbox table and publishes to Kafka.
The outbox table and the event store share a PostgreSQL transaction. They are always consistent. The Kafka publish can fail and be retried. Consumers are idempotent. The system is eventually consistent with guaranteed delivery.
The Outbox Table
-- FROM SCRATCH
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published BOOLEAN NOT NULL DEFAULT FALSE,
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished ON outbox (published, created_at ASC)
WHERE published = FALSE;
The partial index on published = FALSE ensures that the outbox relay query is efficient even as the table grows. Only unpublished rows are scanned. Published rows are ignored by the index.
The From-Scratch Implementation
Event Store with Outbox
// FROM SCRATCH
public class OutboxAwareEventStore {
private final DataSource dataSource;
private final ObjectMapper mapper;
public OutboxAwareEventStore(DataSource dataSource, ObjectMapper mapper) {
this.dataSource = dataSource;
this.mapper = mapper;
}
public void appendWithOutbox(String streamId, long expectedVersion,
List<? extends OrderEvent> events, String aggregateType) {
String eventSql = """
INSERT INTO event_store (stream_id, sequence_number, event_type, payload, metadata, occurred_at)
VALUES (?, ?, ?, ?::jsonb, '{}'::jsonb, ?)
""";
String outboxSql = """
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (?, ?, ?, ?::jsonb)
""";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try {
long sequence = expectedVersion + 1;
// Write to event store
try (PreparedStatement eventStmt = conn.prepareStatement(eventSql)) {
for (OrderEvent event : events) {
eventStmt.setString(1, streamId);
eventStmt.setLong(2, sequence);
eventStmt.setString(3, event.getClass().getSimpleName());
eventStmt.setString(4, mapper.writeValueAsString(event));
eventStmt.setTimestamp(5, Timestamp.from(event.occurredAt()));
eventStmt.addBatch();
sequence++;
}
eventStmt.executeBatch();
}
// Write to outbox in the same transaction
try (PreparedStatement outboxStmt = conn.prepareStatement(outboxSql)) {
for (OrderEvent event : events) {
outboxStmt.setString(1, aggregateType);
outboxStmt.setString(2, extractAggregateId(event));
outboxStmt.setString(3, event.getClass().getSimpleName());
outboxStmt.setString(4, mapper.writeValueAsString(event));
outboxStmt.addBatch();
}
outboxStmt.executeBatch();
}
conn.commit();
} catch (SQLException e) {
conn.rollback();
if ("23505".equals(e.getSQLState())) {
throw new OptimisticConcurrencyException(streamId, expectedVersion);
}
throw new EventStoreException("Append with outbox failed", e);
} catch (JsonProcessingException e) {
conn.rollback();
throw new EventStoreException("Serialization failed", e);
}
} catch (SQLException e) {
throw new EventStoreException("Connection failed", e);
}
}
private String extractAggregateId(OrderEvent event) {
return event.orderId();
}
}
The Outbox Relay
// FROM SCRATCH
public class OutboxRelay {
private final DataSource dataSource;
private final KafkaProducer<String, String> producer;
private final String topicName;
private final int batchSize;
private volatile boolean running;
public OutboxRelay(DataSource dataSource, KafkaProducer<String, String> producer,
String topicName, int batchSize) {
this.dataSource = dataSource;
this.producer = producer;
this.topicName = topicName;
this.batchSize = batchSize;
}
public void start() {
running = true;
Thread.ofVirtual().name("outbox-relay").start(this::relayLoop);
}
public void stop() {
running = false;
}
private void relayLoop() {
while (running) {
try {
int published = publishBatch();
if (published == 0) {
Thread.sleep(100); // No pending messages, wait
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
try { Thread.sleep(1000); }
catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; }
}
}
}
private int publishBatch() {
String selectSql = """
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox
WHERE published = FALSE
ORDER BY created_at ASC
LIMIT ?
FOR UPDATE SKIP LOCKED
""";
String updateSql = "UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = ?";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
List<OutboxMessage> messages = new ArrayList<>();
try (PreparedStatement stmt = conn.prepareStatement(selectSql)) {
stmt.setInt(1, batchSize);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
messages.add(new OutboxMessage(
rs.getLong("id"),
rs.getString("aggregate_type"),
rs.getString("aggregate_id"),
rs.getString("event_type"),
rs.getString("payload")
));
}
}
}
if (messages.isEmpty()) {
conn.rollback();
return 0;
}
// Publish to Kafka
for (OutboxMessage msg : messages) {
ProducerRecord<String, String> record = new ProducerRecord<>(
topicName,
msg.aggregateId(), // Use aggregate ID as key for partition ordering
msg.payload()
);
record.headers().add("eventType", msg.eventType().getBytes());
record.headers().add("aggregateType", msg.aggregateType().getBytes());
producer.send(record).get(); // Synchronous send for guaranteed ordering
}
// Mark as published
try (PreparedStatement stmt = conn.prepareStatement(updateSql)) {
for (OutboxMessage msg : messages) {
stmt.setLong(1, msg.id());
stmt.addBatch();
}
stmt.executeBatch();
}
conn.commit();
return messages.size();
} catch (SQLException | InterruptedException | ExecutionException e) {
throw new OutboxRelayException("Failed to relay outbox messages", e);
}
}
}
record OutboxMessage(long id, String aggregateType, String aggregateId,
String eventType, String payload) {}
What the Implementation Reveals
The FOR UPDATE SKIP LOCKED clause is critical for scaling the outbox relay. Without it, running multiple relay instances (for high availability) causes contention: each instance locks the same rows, and only one proceeds. SKIP LOCKED allows each instance to process different rows concurrently.
The Kafka send is synchronous (producer.send(record).get()). This is deliberate. The relay must know whether the publish succeeded before marking the outbox row as published. An asynchronous send with a callback would mark the row as published before confirming delivery, reintroducing the inconsistency that the outbox pattern eliminates.
Using the aggregate ID as the Kafka message key ensures that all events for the same aggregate land in the same partition. Kafka guarantees ordering within a partition. This means events for order 12345 are always consumed in the order they were produced. Without this, a consumer might receive OrderCancelled before OrderPlaced.
The outbox table grows as events accumulate. Published rows are no longer needed. A periodic cleanup job deletes published rows older than a retention period:
-- PRODUCTION
DELETE FROM outbox WHERE published = TRUE AND published_at < NOW() - INTERVAL '7 days';
Kafka Producer Configuration
// PRODUCTION
public static KafkaProducer<String, String> createProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Durability: wait for all replicas to acknowledge
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Idempotence: prevent duplicate messages from producer retries
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// Retries: handle transient broker failures
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
// Ordering: ensure messages for the same key arrive in order
// max.in.flight.requests.per.connection must be <= 5 for idempotent producer
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Batching: the relay sends one message at a time, but batching
// is useful if the relay is extended to send multiple messages per partition
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
return new KafkaProducer<>(props);
}
acks=all: The producer waits for all in-sync replicas to acknowledge the write. This prevents message loss if the leader broker fails after accepting the message but before replicating it.
enable.idempotence=true: The broker deduplicates messages from the same producer using a sequence number. If the producer retries a send (because the ack was lost), the broker recognizes the duplicate and does not store it twice. This is essential for the outbox relay, which might retry sends during transient failures.
retries=MAX_VALUE: The producer retries indefinitely on transient failures. Combined with idempotence, this ensures that every message is delivered exactly once to the broker.
Kafka Consumer Configuration
External consumers that process events from Kafka need their own configuration:
// PRODUCTION
public static KafkaConsumer<String, String> createConsumer(
String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Manual offset management for at-least-once delivery
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Start from earliest when no committed offset exists
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Fetch tuning
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
return new KafkaConsumer<>(props);
}
enable.auto.commit=false: Auto-commit offsets periodically regardless of whether the consumer has finished processing the messages. If the consumer crashes after auto-commit but before processing, the messages are lost. Manual commit after processing provides at-least-once delivery.
auto.offset.reset=earliest: When a new consumer group is created or an offset is not found, start from the earliest message. This is important for event-sourced consumers that must process the complete event history.
The Production Path
// PRODUCTION
@Configuration
public class KafkaOutboxConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> factory) {
return new KafkaTemplate<>(factory);
}
@Bean
public ProducerFactory<String, String> producerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
Map<String, Object> config = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE
);
return new DefaultKafkaProducerFactory<>(config);
}
}
@Component
public class SpringOutboxRelay {
private final JdbcTemplate jdbc;
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${outbox.topic:order-events}")
private String topicName;
@Value("${outbox.batch-size:50}")
private int batchSize;
public SpringOutboxRelay(JdbcTemplate jdbc, KafkaTemplate<String, String> kafkaTemplate) {
this.jdbc = jdbc;
this.kafkaTemplate = kafkaTemplate;
}
@Scheduled(fixedDelay = 100)
@Transactional
public void relay() {
List<OutboxMessage> messages = jdbc.query(
"""
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox WHERE published = FALSE
ORDER BY created_at ASC LIMIT ?
FOR UPDATE SKIP LOCKED
""",
(rs, rowNum) -> new OutboxMessage(
rs.getLong("id"), rs.getString("aggregate_type"),
rs.getString("aggregate_id"), rs.getString("event_type"),
rs.getString("payload")
),
batchSize
);
for (OutboxMessage msg : messages) {
kafkaTemplate.send(topicName, msg.aggregateId(), msg.payload()).join();
jdbc.update("UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = ?", msg.id());
}
}
}
The Test
// FROM SCRATCH
@Testcontainers
class OutboxRelayTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
.withDatabaseName("outbox_test")
.withInitScript("outbox_schema.sql");
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
private OutboxAwareEventStore eventStore;
private OutboxRelay relay;
private KafkaConsumer<String, String> consumer;
@BeforeEach
void setUp() {
var ds = new org.postgresql.ds.PGSimpleDataSource();
ds.setUrl(postgres.getJdbcUrl());
ds.setUser(postgres.getUsername());
ds.setPassword(postgres.getPassword());
var mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
eventStore = new OutboxAwareEventStore(ds, mapper);
var producer = createProducer(kafka.getBootstrapServers());
relay = new OutboxRelay(ds, producer, "order-events", 50);
consumer = createConsumer(kafka.getBootstrapServers(), "test-group");
consumer.subscribe(List.of("order-events"));
}
@Test
void eventsPublishedToKafkaViaOutbox() throws Exception {
// Write events with outbox
eventStore.appendWithOutbox("order-kafka-1", -1, List.of(
new OrderPlaced("kafka-1", "c1",
List.of(new LineItem("p1", "Widget", 1, BigDecimal.TEN)),
BigDecimal.TEN,
new Address("1 St", "City", "ST", "00000", "US"),
Instant.now())
), "Order");
// Run relay
relay.start();
Thread.sleep(2000);
relay.stop();
// Consume from Kafka
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
assertEquals(1, records.count());
ConsumerRecord<String, String> record = records.iterator().next();
assertEquals("kafka-1", record.key());
assertTrue(record.value().contains("Widget"));
}
@Test
void outboxPreventsDualWriteInconsistency() throws Exception {
// Event store append succeeds, outbox is written atomically
eventStore.appendWithOutbox("order-kafka-2", -1, List.of(
new OrderPlaced("kafka-2", "c1", List.of(), BigDecimal.ZERO,
new Address("1", "C", "S", "0", "US"), Instant.now())
), "Order");
// Verify outbox has unpublished message
try (Connection conn = ((PGSimpleDataSource) getDataSource()).getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(
"SELECT COUNT(*) FROM outbox WHERE published = FALSE")) {
rs.next();
assertTrue(rs.getInt(1) > 0);
}
}
@AfterEach
void tearDown() {
consumer.close();
}
}
The test uses real PostgreSQL and Kafka containers. No mocks. No embedded substitutes. The outbox relay publishes to an actual Kafka broker, and the consumer receives from the same broker. This is the only way to verify that the outbox pattern works correctly end-to-end.
This chapter established the outbox pattern as the non-negotiable bridge between the event store and Kafka. The next chapter uses Kafka-published events to coordinate multi-step workflows across aggregates.