Decoupling with Kafka: What to Make Async and What to Keep Synchronous
Decoupling with Kafka: What to Make Async and What to Keep Synchronous
The Symptom
After the initial Kafka migration from CH9, the team got excited. Every new feature started with “let’s put it on Kafka.” Ride acceptance went async. Fare quotes went async. A rider requests a fare quote, the request publishes to a topic, a consumer calculates the fare, publishes the result to another topic, and a WebSocket pushes it to the rider. Round-trip: 1,200ms. The synchronous version took 40ms.
The fare quote endpoint hit 2% timeout rate during evening surge. Riders saw a loading spinner, tapped again, generated duplicate requests. The team had over-corrected. Everything was async. The pendulum swung too far.
The Cause
The decision rule was missing. The team treated Kafka as a universal hammer. Async is not free. Every async boundary adds latency (producer serialization, broker persistence, consumer deserialization, consumer processing). For operations where the caller waits for the result, that added latency is pure waste.
The rule: if the rider is staring at a spinner waiting for this response, keep it synchronous. If the response is a side effect that no user waits for, make it async.
| Operation | User Waiting? | Sync or Async |
|---|---|---|
| Fare quote | Yes, rider sees price | Sync |
| Ride acceptance | Yes, driver confirms | Sync |
| Trip completion (fare) | Yes, rider sees total | Sync |
| Trip analytics aggregation | No | Async |
| Receipt email | No | Async |
| Surge recalculation | No (affects next rider) | Async |
| Driver rating update | No (background) | Async |
| Fraud detection scoring | No (post-trip) | Async |
| Payment capture | Depends (see below) | Sync write, async retry |
Payment capture is the edge case. The initial charge is synchronous because the rider needs confirmation. But payment retries (declined card, network timeout) go to Kafka for retry with exponential backoff.
The Baseline
The fare quote endpoint, after the team made it async via Kafka:
# load-tests/fare_quote_locustfile.py
from locust import HttpUser, task, between, LoadTestShape
import random
class FareQuoteUser(HttpUser):
wait_time = between(0.1, 0.5)
@task
def request_fare_quote(self):
self.client.post(
"/api/fare/quote",
json={
"pickupLat": 40.7128 + random.uniform(-0.05, 0.05),
"pickupLng": -74.0060 + random.uniform(-0.05, 0.05),
"dropoffLat": 40.7580 + random.uniform(-0.05, 0.05),
"dropoffLng": -73.9855 + random.uniform(-0.05, 0.05),
"rideType": random.choice(["standard", "premium", "pool"])
},
name="/api/fare/quote"
)
class FareQuoteShape(LoadTestShape):
stages = [
{"duration": 60, "users": 200, "spawn_rate": 20},
{"duration": 120, "users": 500, "spawn_rate": 30},
{"duration": 180, "users": 1000, "spawn_rate": 50},
{"duration": 240, "users": 2000, "spawn_rate": 50},
{"duration": 300, "users": 3000, "spawn_rate": 50},
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
return (stage["users"], stage["spawn_rate"])
return None
Fare quote via Kafka round-trip (publish request, consume, compute, publish result, push via WebSocket):
| Users | RPS | p50 (ms) | p99 (ms) | Timeout Rate |
|---|---|---|---|---|
| 200 | 380 | 420 | 890 | 0.1% |
| 500 | 820 | 480 | 1,050 | 0.8% |
| 1000 | 1,400 | 560 | 1,200 | 2.1% |
| 2000 | 2,100 | 780 | 1,800 | 5.4% |
| 3000 | 2,300 | 1,100 | 3,200 | 11.2% |
A fare quote should take 40ms. This takes 480ms at p50. The Kafka round-trip adds 440ms of overhead for zero benefit.
The Fix
Step 1: Revert Fare Quote to Synchronous
// SCALED: Fare quote is synchronous - the rider is waiting
@PostMapping("/api/fare/quote")
public ResponseEntity<FareQuote> getFareQuote(@RequestBody FareQuoteRequest request) {
FareQuote quote = fareService.calculateFare(
request.getPickup(),
request.getDropoff(),
request.getRideType()
);
return ResponseEntity.ok(quote); // 40ms, no Kafka involved
}
Step 2: Build the RideCompletedEvent Producer
Trip completion is the core async use case. The fare is synchronous. Everything after is a Kafka event.
// SCALED: Event carries all data consumers need - no callbacks to the producer
public record RideCompletedEvent(
String tripId,
String riderId,
String driverId,
String pickupZoneId,
String dropoffZoneId,
BigDecimal fare,
BigDecimal distance,
Duration tripDuration,
Instant completedAt,
String rideType
) {}
The event is self-contained. Consumers do not call back to the trip service for additional data. Every field a consumer needs is in the event. This prevents the “event-driven spaghetti” pattern where consumers make synchronous calls to producers, creating circular dependencies.
// SCALED: Idempotent producer with exactly-once delivery guarantee
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.RETRIES_CONFIG, 3,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5,
ProducerConfig.LINGER_MS_CONFIG, 5,
ProducerConfig.BATCH_SIZE_CONFIG, 16384,
ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"
);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
enable.idempotence=true assigns a producer ID and sequence number to each message. If the broker receives a duplicate (from a retry), it deduplicates silently. Combined with acks=all, the message is persisted on all in-sync replicas before the producer gets an acknowledgment. The send() call itself takes under 1ms because it buffers to a local batch (linger.ms=5).
// SCALED: Trip completion service - sync fare, async everything else
@Service
public class TripCompletionService {
private final TripRepository tripRepository;
private final FareCalculator fareCalculator;
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final Logger log = LoggerFactory.getLogger(TripCompletionService.class);
@Transactional
public TripReceipt complete(TripCompletionRequest request) {
Trip trip = tripRepository.findById(request.getTripId())
.orElseThrow(() -> new TripNotFoundException(request.getTripId()));
BigDecimal finalFare = fareCalculator.finalize(trip, request);
trip.complete(finalFare, request.getDistance(), request.getDuration());
Trip saved = tripRepository.save(trip);
// SCALED: Publish event after DB commit - async from here
RideCompletedEvent event = new RideCompletedEvent(
saved.getId(), saved.getRiderId(), saved.getDriverId(),
saved.getPickupZoneId(), saved.getDropoffZoneId(),
saved.getFare(), saved.getDistance(), saved.getDuration(),
saved.getCompletedAt(), saved.getRideType()
);
kafkaTemplate.send("ride-completed", saved.getId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish RideCompletedEvent for trip {}",
saved.getId(), ex);
// Event will be retried by the outbox poller (CH14)
}
});
return saved.toReceipt();
}
}
Step 3: Kafka Consumers with Spring @KafkaListener
// SCALED: Analytics consumer - separate consumer group, independent scaling
@Component
public class TripAnalyticsConsumer {
private final AnalyticsAggregator aggregator;
private final MeterRegistry meterRegistry;
@KafkaListener(
topics = "ride-completed",
groupId = "trip-analytics",
containerFactory = "analyticsListenerFactory"
)
public void onRideCompleted(
@Payload RideCompletedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
meterRegistry.counter("kafka.consumer.events",
"topic", "ride-completed", "group", "trip-analytics").increment();
aggregator.recordTrip(
event.pickupZoneId(),
event.fare(),
event.distance(),
event.tripDuration()
);
}
}
// SCALED: Notification consumer - lower concurrency, email is not latency-sensitive
@Component
public class ReceiptNotificationConsumer {
private final NotificationService notificationService;
@KafkaListener(
topics = "ride-completed",
groupId = "trip-notifications",
containerFactory = "notificationListenerFactory"
)
public void onRideCompleted(@Payload RideCompletedEvent event) {
notificationService.sendTripReceipt(
event.riderId(),
event.tripId(),
event.fare(),
event.completedAt()
);
}
}
// SCALED: Consumer factory with error handling and concurrency
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>
analyticsListenerFactory(ConsumerFactory<String, RideCompletedEvent> cf) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>();
factory.setConsumerFactory(cf);
factory.setConcurrency(3);
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3) // 1s between retries, max 3 attempts
));
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.RECORD
);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>
notificationListenerFactory(ConsumerFactory<String, RideCompletedEvent> cf) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>();
factory.setConsumerFactory(cf);
factory.setConcurrency(2);
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(2000L, 5) // Notifications can retry more aggressively
));
return factory;
}
}
Step 4: Kubernetes Deployment for Consumers
Consumers run as a separate Deployment from the API pods. Scale them independently. The API needs low latency. Consumers need throughput.
# k8s/trip-analytics-consumer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: trip-analytics-consumer
labels:
app: trip-analytics-consumer
spec:
replicas: 3
selector:
matchLabels:
app: trip-analytics-consumer
template:
metadata:
labels:
app: trip-analytics-consumer
spec:
containers:
- name: consumer
image: ridehail/trip-analytics-consumer:latest
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
env:
- name: SPRING_PROFILES_ACTIVE
value: "consumer"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
livenessProbe:
httpGet:
path: /actuator/health/liveness
port: 8081
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /actuator/health/readiness
port: 8081
initialDelaySeconds: 15
periodSeconds: 5
The ride-completed topic has 12 partitions. With 3 replicas at concurrency 3, each replica runs 3 consumer threads, totaling 9 active consumers. Each consumer gets 1-2 partitions. Adding a 4th replica distributes the load further: 12 consumers for 12 partitions, one per partition. This is the maximum parallelism for this topic.
The Proof
Same Locust test from the baseline, now with synchronous fare quote restored and trip completion publishing to Kafka:
Fare Quote (Reverted to Synchronous)
| Users | RPS | p50 (ms) | p99 (ms) | Timeout Rate |
|---|---|---|---|---|
| 200 | 480 | 28 | 42 | 0% |
| 500 | 1,180 | 30 | 48 | 0% |
| 1000 | 2,300 | 32 | 55 | 0% |
| 2000 | 4,400 | 35 | 62 | 0% |
| 3000 | 6,200 | 38 | 78 | 0% |
p99 dropped from 1,200ms to 55ms at 1,000 users. Timeout rate went from 2.1% to 0%. Removing the Kafka round-trip removed 95% of the latency.
Trip Completion (Kafka-Decoupled)
| Users | RPS | p50 (ms) | p99 (ms) | Error Rate | Tomcat Threads Active |
|---|---|---|---|---|---|
| 500 | 1,400 | 12 | 28 | 0% | 22 |
| 1000 | 2,800 | 14 | 32 | 0% | 38 |
| 2000 | 5,400 | 15 | 45 | 0% | 62 |
| 3000 | 7,800 | 16 | 58 | 0% | 84 |
| 4000 | 9,600 | 18 | 85 | 0% | 108 |
| Metric | Sync (All Inline) | Kafka-Decoupled | Improvement |
|---|---|---|---|
| Trip completion p99 at 2000 | 450ms | 45ms | 10x |
| Trip completion p99 at 4000 | 1,200ms | 85ms | 14x |
| Fare quote p99 at 1000 | 1,200ms (async) | 55ms (sync) | 22x |
| Max RPS before errors | 3,200 | 9,600+ | 3x |
| Error rate at 4000 users | 12.4% | 0% | Eliminated |
The fare quote got faster by removing Kafka. The trip completion got faster by adding Kafka. The difference: one operation has a user waiting for the response. The other does not. That is the entire decision framework.