Java 25 Gather API
TL;DR
Java 25’s Gather API solves a 15-year-old problem: you can finally write concurrent I/O code that’s both readable and performant. mapConcurrent() lets you process stream elements concurrently using virtual threads with explicit concurrency limits, killing three birds with one stone: no ExecutorService boilerplate, no CompletableFuture callback hell, and no parallel stream footguns. For 10 items with 100ms latency each, mapConcurrent(5) completes in ~200ms versus 400-500ms for ExecutorService or parallel streams. Production deployments show 2-3× latency improvements for microservice fan-out patterns.
Why We Needed This
For a very long time, Java forced us to choose between two bad options for concurrent I/O:
Option 1: Parallel Streams - Readable but broken for I/O. Uses ForkJoinPool.commonPool() sized by CPU cores (typically 2-8 threads). One slow database query blocks a platform thread, starving every other parallel stream in our JVM. We’ve all seen the incident reports: “parallel stream made 1000 HTTP calls, saturated the common pool, killed the entire app.”
Option 2: CompletableFuture - Performant but unreadable. You end up with 50-line chains of thenCompose, thenApply, exceptionally that look nothing like the business logic they implement. Error handling becomes a maze of callbacks. Debugging stack traces show you framework internals, not your code.
Option 3: ExecutorService - The old guard. Explicit thread pools, manual Future management, verbose boilerplate:
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<UserProfile>> futures = userIds.stream()
.map(id -> executor.submit(() -> fetchProfile(id)))
.collect(Collectors.toList());
List<UserProfile> profiles = futures.stream()
.map(f -> { try { return f.get(); } catch (Exception e) { throw new RuntimeException(e); } })
.collect(Collectors.toList());
executor.shutdown();
The Gather API + virtual threads kills this false trilemma. You write readable stream code that performs like highly-tuned async systems.
How mapConcurrent Actually Works
The Gather API introduces Gatherer<T, A, R>, an intermediate stream operation (unlike Collector, which is terminal). A gatherer transforms T inputs to R outputs while maintaining state A. It’s defined by four functions:
- Initializer (
Supplier<A>): Creates the state object (e.g., a buffer for windowing) - Integrator (
Integrator<A, T, R>): Processes each element, updates state, emits downstream - Combiner (
BinaryOperator<A>): Merges states for parallel execution - Finisher (
BiConsumer<A, Downstream<R>>): Emits remaining buffered elements
mapConcurrent(maxConcurrency, mapper) is a built-in gatherer that:
- Spawns a virtual thread for each element to run
mapper - Maintains at most
maxConcurrencyactive virtual threads (semaphore-style backpressure) - Preserves encounter order via an internal reordering buffer
- Interrupts all active threads if downstream throws an exception
The Critical Insight: Because virtual threads are cheap (~1KB vs ~1MB for platform threads), you can spawn millions. The maxConcurrency parameter isn’t protecting your OS from thread exhaustion, it’s rate-limiting your downstream dependencies (databases, APIs) from getting hammered.
The Three-Line Revolution
Here’s the before/after that sold me on migrating:
Before (ExecutorService):
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<UserProfile>> futures = new ArrayList<>();
for (String id : userIds) {
futures.add(executor.submit(() -> fetchProfile(id)));
}
List<UserProfile> profiles = new ArrayList<>();
for (Future<UserProfile> future : futures) {
try {
profiles.add(future.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
executor.shutdown();
After (Gather API):
List<UserProfile> profiles = userIds.stream()
.gather(Gatherers.mapConcurrent(10, userId -> fetchProfile(userId)))
.toList();
Same semantics. Same concurrency. 90% less code. No thread pool lifecycle management. No manual Future unwrapping.
Performance:
I benchmarked three approaches on my service aggregation layer (fetches data from 5-20 downstream services):
| Approach | 10 items × 100ms latency | Memory Overhead | GC Pressure |
|---|---|---|---|
| mapConcurrent(5) | ~200ms | Minimal (1KB/thread) | Low |
| Parallel Streams | ~300-400ms | High (1MB/thread) | High |
| ExecutorService | ~400-500ms | High | High |
| Sequential | ~1000ms | Minimal | Minimal |
The win isn’t just latency, it’s scalability. When we hit 1000 concurrent requests to our gateway service (each fanning out to 10 microservices), mapConcurrent handled it without breaking a sweat. Parallel streams started failing with thread pool exhaustion. ExecutorService required careful tuning to avoid OOM errors.
When to mapConcurrent
Use For:
1. Microservice Fan-Out/Fan-In You’re building a BFF (Backend for Frontend) that calls 10 downstream APIs to assemble a page:
var pageData = endpoints.stream()
.gather(Gatherers.mapConcurrent(20, endpoint ->
httpClient.get(endpoint).body()
))
.toList();
2. Batch Processing with I/O Reading 1M records from Postgres, enriching against a REST API, writing to Kafka:
databaseRecords.stream()
.gather(Gatherers.windowFixed(1000)) // Batch for efficiency
.gather(Gatherers.mapConcurrent(50, batch ->
batch.stream()
.map(this::enrichFromApi)
.toList()
))
.flatMap(List::stream)
.forEach(kafkaProducer::send);
3. Rate-Limited Third-Party APIs Calling an API that enforces “50 requests/second”:
items.stream()
.gather(Gatherers.mapConcurrent(50, item -> externalApi.fetch(item)))
.toList();
The maxConcurrency=50 acts as a built-in rate limiter. No need for something like a Guava RateLimiter or custom semaphores.
Don’t Use For:
1. CPU-Bound Work - Use parallelStream() instead. Virtual threads don’t make your CPU faster.
2. Operations <10ms - Thread creation overhead (even virtual threads) isn’t free. If your operation takes 5ms, sequential is often faster.
3. Strict Ordering Requirements - mapConcurrent preserves order by buffering results. If element 1 takes 10 seconds and element 2 takes 10ms, element 2 sits in a buffer until element 1 completes. Use .unordered() if order doesn’t matter:
stream.unordered()
.gather(Gatherers.mapConcurrent(10, task))
.toList();
Error Handling
Concurrent failures are messy. One bad element shouldn’t kill your entire stream. Use a Result wrapper:
public record Result<T>(T value, Throwable error) {
public boolean isSuccess() { return error == null; }
}
List<Result<UserProfile>> results = userIds.stream()
.gather(Gatherers.mapConcurrent(10, id -> {
try {
return new Result<>(fetchProfile(id), null);
} catch (Exception e) {
return new Result<>(null, e);
}
}))
.toList();
List<UserProfile> successes = results.stream()
.filter(Result::isSuccess)
.map(Result::value)
.toList();
List<Throwable> failures = results.stream()
.filter(r -> !r.isSuccess())
.map(Result::error)
.toList();
This pattern gives you partial success handling.
Beyond mapConcurrent: The Full Gatherer Toolkit
windowFixed() and windowSliding()
Before Java 25, creating sliding windows was painful. Now it’s built-in:
// Sliding window for time-series anomaly detection
sensorReadings.stream()
.gather(Gatherers.windowSliding(10)) // Last 10 readings
.filter(window -> detectAnomaly(window))
.findFirst();
// Fixed batching for database inserts
records.stream()
.gather(Gatherers.windowFixed(1000))
.forEach(batch -> db.batchInsert(batch));
scan() - Running Aggregations
Like reduce() but emits intermediate results:
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.scan(() -> 0, Integer::sum))
.toList();
// Output: [1, 3, 6, 10, 15]
Use case: financial dashboards showing running portfolio values, cumulative sales metrics.
fold() - Ordered Reduction
Unlike reduce(), fold() doesn’t require associativity or commutativity:
List<String> tokens = List.of("Hello", " ", "World");
String result = tokens.stream()
.gather(Gatherers.fold(() -> "", (acc, s) -> acc + s))
.findFirst()
.orElse("");
// Result: "Hello World"
Be Careful:
1. The Ordering Tax (Head-of-Line Blocking)
If element 1 takes 10 seconds and elements 2-100 take 10ms each, the reordering buffer holds those 99 fast results until element 1 completes. Solution: Use .unordered() or rethink your data model to avoid heterogeneous latencies.
2. Virtual Thread Pinning
If your mapper uses synchronized blocks or blocking file I/O (not NIO), virtual threads get “pinned” to platform threads, killing performance. Solution: Replace synchronized with ReentrantLock, use NIO for file operations.
3. Downstream Backpressure
If your downstream consumer (database, Kafka) is slower than your producer, the stream buffers data in memory. Solution: Add windowing to batch operations:
stream
.gather(Gatherers.mapConcurrent(50, this::enrich))
.gather(Gatherers.windowFixed(100))
.forEach(batch -> slowDatabase.insertBatch(batch));
Real-World Pattern: SEDA-Style Pipelines
Staged Event-Driven Architecture works beautifully with gatherers:
orders.stream()
// Stage 1: Validation (sequential, fast)
.filter(OrderValidator::isValid)
// Stage 2: Inventory reservation (I/O, high concurrency)
.gather(Gatherers.mapConcurrent(20, inventoryService::reserve))
// Stage 3: Payment (I/O, strict limit due to API quotas)
.gather(Gatherers.mapConcurrent(5, paymentService::charge))
// Stage 4: Email (I/O, high concurrency)
.gather(Gatherers.mapConcurrent(50, emailService::send))
.toList();
If the payment service slows down, the mapConcurrent(5) gatherer stops pulling from inventory. Inventory eventually fills its buffer and stops pulling from the source. Backpressure propagates upstream automatically, no manual queue management.
Migration Strategy for Production
Phase 1: Identify all parallelStream() usages that do I/O. Replace with mapConcurrent.
Phase 2: Find CompletableFuture chains longer than 3 operations. Rewrite as mapConcurrent + sequential operations.
Phase 3: Profile maxConcurrency values. Start conservative (5-10), measure p99 latency, increase until you hit diminishing returns or downstream rate limits.
Phase 4: Update observability tooling to handle virtual threads. Ensure structured concurrency context propagation works.
Benchmarking Your Own Code
Don’t trust my numbers. Profile your workload:
// Baseline: Sequential
long start = System.currentTimeMillis();
var sequential = items.stream()
.map(this::expensiveOperation)
.toList();
long seqTime = System.currentTimeMillis() - start;
// Test: mapConcurrent
start = System.currentTimeMillis();
var concurrent = items.stream()
.gather(Gatherers.mapConcurrent(10, this::expensiveOperation))
.toList();
long concTime = System.currentTimeMillis() - start;
System.out.println("Speedup: " + ((double)seqTime / concTime) + "x");
Red flags:
- If speedup <1.5×, you’re probably CPU-bound or have tiny operations
- If speedup >10×, your downstream service might be getting hammered, lower
maxConcurrency
Final Thoughts
The Gather API isn’t just a utility update, it’s a paradigm correction. For many years, we’ve been choosing between readable code (Streams) and performant I/O (CompletableFuture). That trade-off is dead.
mapConcurrent democratizes high-concurrency programming. You no longer need to understand reactive frameworks, monadic composition, or callback hell to write performant I/O code. You write straightforward, blocking-style code that performs like highly-tuned async systems.
Continue reading
Next article
Why Engineers Are Ditching Windows: The Great Desktop Migration
Related Content
Java 25 Structured Concurrency: The End of Thread Leaks
Deep dive into Java 25's Structured Concurrency (JEP 505): how the new Joiner architecture fixes thread leaks, improves observability, and replaces ExecutorService for modern concurrent applications.
Threads and Concurrency Explained: Complete Guide with Java, Python & Virtual Threads
Learn threads and concurrency with practical Java & Python examples. Covers multithreading, virtual threads, async programming, race conditions, and deadlocks.
Speed Up Your Computations
Optimize Java performance using CPU cache-friendly data structures. Learn how data locality, cache lines, and SIMD-friendly layouts can yield 32x speedups without parallelism.