Skip to main content
fast by design

Structured Concurrency for Fan-Out Operations

10 min read Chapter 24 of 90

Structured Concurrency for Fan-Out Operations

Fan-out operations are the natural habitat of virtual threads. The content platform’s recommendation engine queries three data sources in parallel: the user’s reading history from PostgreSQL, trending articles from Redis, and content similarity scores from a vector search service. With CompletableFuture:

// SLOW: CompletableFuture fan-out has implicit error handling gaps
public class CompletableFutureRecommender {

    private final HistoryService historyService;
    private final TrendingService trendingService;
    private final SimilarityService similarityService;

    public CompletableFutureRecommender(HistoryService historyService,
                                        TrendingService trendingService,
                                        SimilarityService similarityService) {
        this.historyService = historyService;
        this.trendingService = trendingService;
        this.similarityService = similarityService;
    }

    public List<Article> recommend(String userId, String currentArticleId) {
        CompletableFuture<List<String>> historyFuture =
            CompletableFuture.supplyAsync(
                () -> historyService.getReadArticleIds(userId));

        CompletableFuture<List<Article>> trendingFuture =
            CompletableFuture.supplyAsync(
                () -> trendingService.getTrending(50));

        CompletableFuture<List<Article>> similarFuture =
            CompletableFuture.supplyAsync(
                () -> similarityService.findSimilar(currentArticleId, 20));

        try {
            List<String> history = historyFuture.get(5, TimeUnit.SECONDS);
            List<Article> trending = trendingFuture.get(5, TimeUnit.SECONDS);
            List<Article> similar = similarFuture.get(5, TimeUnit.SECONDS);

            return mergeAndRank(history, trending, similar);
        } catch (TimeoutException e) {
            // Which futures are still running? All of them, potentially.
            // CompletableFuture does not cancel the underlying thread.
            historyFuture.cancel(true);  // May or may not interrupt
            trendingFuture.cancel(true);
            similarFuture.cancel(true);
            return List.of();
        } catch (ExecutionException | InterruptedException e) {
            // One failed. The other two may still be running.
            // Resource leak: threads continue executing abandoned futures.
            return List.of();
        }
    }

    private List<Article> mergeAndRank(List<String> history,
                                        List<Article> trending,
                                        List<Article> similar) {
        // Merge, deduplicate, filter already-read, rank by score
        return List.of(); // placeholder
    }
}

Three problems with this approach:

  1. Cancellation is unreliable. CompletableFuture.cancel(true) sets the interrupt flag but does not guarantee the task stops. If the task is blocked on a socket read, it will not see the interrupt until the read completes (or times out).

  2. Error propagation is manual. If the history query fails, the trending and similarity queries continue running. You pay for network and compute resources producing results you will discard.

  3. Thread lifecycle is unstructured. The three tasks run on the common ForkJoinPool (or a custom executor). Their lifecycle is not tied to the calling method’s lifecycle. If the calling method returns early, the tasks may still be running.

StructuredTaskScope.ShutdownOnFailure

Structured concurrency ties the lifecycle of subtasks to the enclosing scope. When the scope completes, all subtasks are guaranteed to be either completed or cancelled.

// FAST: Structured concurrency, automatic cancellation on failure
public class StructuredRecommender {

    private final HistoryService historyService;
    private final TrendingService trendingService;
    private final SimilarityService similarityService;

    public StructuredRecommender(HistoryService historyService,
                                  TrendingService trendingService,
                                  SimilarityService similarityService) {
        this.historyService = historyService;
        this.trendingService = trendingService;
        this.similarityService = similarityService;
    }

    public List<Article> recommend(String userId, String currentArticleId)
            throws InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            StructuredTaskScope.Subtask<List<String>> historyTask =
                scope.fork(() -> historyService.getReadArticleIds(userId));

            StructuredTaskScope.Subtask<List<Article>> trendingTask =
                scope.fork(() -> trendingService.getTrending(50));

            StructuredTaskScope.Subtask<List<Article>> similarTask =
                scope.fork(() -> similarityService.findSimilar(
                    currentArticleId, 20));

            scope.join()           // Wait for all tasks
                 .throwIfFailed(); // Throw if any task failed

            // All three succeeded. Safe to call .get()
            return mergeAndRank(
                historyTask.get(),
                trendingTask.get(),
                similarTask.get()
            );
        } catch (ExecutionException e) {
            // Exactly one task failed. The other two were cancelled.
            // No leaked threads. No abandoned futures.
            logger.error("Recommendation fan-out failed", e.getCause());
            return List.of();
        }
    }

    private List<Article> mergeAndRank(List<String> history,
                                        List<Article> trending,
                                        List<Article> similar) {
        return List.of(); // placeholder
    }
}

ShutdownOnFailure enforces a policy: if any subtask fails, cancel all remaining subtasks and propagate the exception. When throwIfFailed() is called after join(), the scope is guaranteed to have either all successes or at least one failure with all other tasks cancelled.

The try-with-resources block ensures the scope is closed even if the calling method throws. Closing the scope interrupts any running subtasks and waits for them to finish. No thread leaks.

Each fork() call starts a new virtual thread for the subtask. These virtual threads are children of the scope. They cannot outlive the scope.

StructuredTaskScope.ShutdownOnSuccess

The content platform also has a first-result-wins pattern: the search fallback chain. When a user searches for articles, the platform tries the primary search service (vector search via Qdrant), the secondary service (Elasticsearch), and the cached results (Redis). The first successful response wins.

// FAST: First successful result wins, others cancelled immediately
public class SearchFallback {

    private final VectorSearchService vectorSearch;
    private final ElasticSearchService elasticSearch;
    private final CachedSearchService cachedSearch;

    public SearchFallback(VectorSearchService vectorSearch,
                           ElasticSearchService elasticSearch,
                           CachedSearchService cachedSearch) {
        this.vectorSearch = vectorSearch;
        this.elasticSearch = elasticSearch;
        this.cachedSearch = cachedSearch;
    }

    public List<Article> search(String query) throws InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<List<Article>>()) {

            scope.fork(() -> vectorSearch.search(query));
            scope.fork(() -> elasticSearch.search(query));
            scope.fork(() -> cachedSearch.search(query));

            scope.join();

            return scope.result(); // Returns the first successful result
        } catch (ExecutionException e) {
            // All three failed
            logger.error("All search backends failed", e.getCause());
            return List.of();
        }
    }
}

ShutdownOnSuccess cancels remaining subtasks as soon as one succeeds. If the vector search responds in 15ms, the Elasticsearch and Redis queries are interrupted immediately. This saves backend load and network resources.

If all three fail, result() throws ExecutionException with the cause from one of the failures. The scope collects all exceptions, but surfaces one.

Timeout Patterns

Timeouts compose naturally with structured concurrency through joinUntil:

public List<Article> recommendWithTimeout(String userId,
                                           String currentArticleId)
        throws InterruptedException {
    Instant deadline = Instant.now().plusMillis(500);

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        var historyTask = scope.fork(
            () -> historyService.getReadArticleIds(userId));
        var trendingTask = scope.fork(
            () -> trendingService.getTrending(50));
        var similarTask = scope.fork(
            () -> similarityService.findSimilar(currentArticleId, 20));

        scope.joinUntil(deadline);  // Timeout after 500ms total
        scope.throwIfFailed();

        return mergeAndRank(
            historyTask.get(),
            trendingTask.get(),
            similarTask.get()
        );
    } catch (TimeoutException e) {
        // Deadline exceeded. All subtasks cancelled automatically.
        // Return partial results if available.
        logger.warn("Recommendation timeout after 500ms");
        return List.of();
    } catch (ExecutionException e) {
        logger.error("Recommendation failed", e.getCause());
        return List.of();
    }
}

joinUntil(deadline) throws TimeoutException if the deadline passes before all subtasks complete. The try-with-resources block then closes the scope, cancelling any still-running subtasks.

The 500ms deadline is for the entire fan-out, not per-task. If the history query takes 400ms and trending takes 50ms, the similarity query has only 50ms before the deadline. This is the correct behavior for a user-facing operation: the total latency budget is fixed.

Nested Deadlines

When fan-out operations call other fan-out operations, deadlines should propagate:

public List<Article> findSimilar(String articleId, int limit)
        throws InterruptedException {
    Instant deadline = Instant.now().plusMillis(200);

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        var embeddingTask = scope.fork(
            () -> embeddingService.getEmbedding(articleId));
        var metadataTask = scope.fork(
            () -> metadataService.getArticleMetadata(articleId));

        scope.joinUntil(deadline);
        scope.throwIfFailed();

        float[] embedding = embeddingTask.get();
        ArticleMetadata metadata = metadataTask.get();

        return vectorStore.searchSimilar(embedding, metadata.categories(), limit);
    } catch (TimeoutException | ExecutionException e) {
        return List.of();
    }
}

If the parent scope has a 500ms deadline and findSimilar has a 200ms deadline, findSimilar will respect the shorter of the two. Structured concurrency scopes are hierarchical: the inner scope cannot outlive the outer scope.

Error Handling Strategies

Partial Results

Not all fan-out operations require all-or-nothing semantics. The recommendation engine should return results even if one data source fails:

public List<Article> recommendWithPartialResults(String userId,
                                                  String currentArticleId)
        throws InterruptedException {
    try (var scope = new StructuredTaskScope<Object>()) {

        var historyTask = scope.fork(
            () -> historyService.getReadArticleIds(userId));
        var trendingTask = scope.fork(
            () -> trendingService.getTrending(50));
        var similarTask = scope.fork(
            () -> similarityService.findSimilar(currentArticleId, 20));

        scope.join(); // Wait for all, do not throw on failure

        List<String> history = safeGet(historyTask, List.of());
        List<Article> trending = safeGet(trendingTask, List.of());
        List<Article> similar = safeGet(similarTask, List.of());

        if (history.isEmpty() && trending.isEmpty() && similar.isEmpty()) {
            logger.error("All recommendation sources failed");
            return List.of();
        }

        return mergeAndRank(history, trending, similar);
    }
}

private <T> T safeGet(StructuredTaskScope.Subtask<T> task, T fallback) {
    return switch (task.state()) {
        case SUCCESS -> task.get();
        case FAILED -> {
            logger.warn("Subtask failed: {}", task.exception().getMessage());
            yield fallback;
        }
        case UNAVAILABLE -> fallback;
    };
}

The base StructuredTaskScope (without ShutdownOnFailure or ShutdownOnSuccess) waits for all subtasks. You inspect each result individually. Failed subtasks return their exception. Successful subtasks return their result.

This pattern degrades gracefully. If the history service is down, recommendations still include trending and similar articles. The user gets a useful result, not an error page.

Custom Scope Policies

For advanced use cases, extend StructuredTaskScope with a custom policy:

// Custom scope: succeed if at least 2 of 3 sources respond
public class QuorumScope<T> extends StructuredTaskScope<T> {

    private final int quorum;
    private final List<T> results = new CopyOnWriteArrayList<>();
    private final List<Throwable> errors = new CopyOnWriteArrayList<>();

    public QuorumScope(int quorum) {
        this.quorum = quorum;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        switch (subtask.state()) {
            case SUCCESS -> {
                results.add(subtask.get());
                if (results.size() >= quorum) {
                    shutdown(); // Cancel remaining tasks
                }
            }
            case FAILED -> {
                errors.add(subtask.exception());
            }
            case UNAVAILABLE -> { }
        }
    }

    public List<T> results() {
        ensureOwnerAndJoined();
        if (results.size() < quorum) {
            throw new IllegalStateException(
                "Quorum not reached: " + results.size() + "/" + quorum);
        }
        return List.copyOf(results);
    }
}

Usage:

public List<Article> recommendWithQuorum(String userId,
                                          String currentArticleId)
        throws InterruptedException {
    try (var scope = new QuorumScope<List<Article>>(2)) {

        scope.fork(() -> historyBasedRecommendations(userId));
        scope.fork(() -> trendingRecommendations());
        scope.fork(() -> similarityRecommendations(currentArticleId));

        scope.join();

        return scope.results().stream()
            .flatMap(List::stream)
            .distinct()
            .limit(20)
            .toList();
    }
}

The quorum scope completes as soon as 2 of 3 sources respond, cancelling the slowest one. This bounds tail latency to the second-fastest source instead of the slowest.

Fan-Out at Scale: Content Ingestion Pipeline

The content platform ingests articles from 200 RSS feeds every 15 minutes. Each feed fetch involves DNS resolution, TLS handshake, HTTP GET, XML parsing, and content enrichment. With structured concurrency:

public class ContentIngestionPipeline {

    private final FeedParser feedParser;
    private final ContentEnricher enricher;
    private final ArticleStore articleStore;
    private final Semaphore concurrencyLimit;

    public ContentIngestionPipeline(FeedParser feedParser,
                                     ContentEnricher enricher,
                                     ArticleStore articleStore,
                                     int maxConcurrentFeeds) {
        this.feedParser = feedParser;
        this.enricher = enricher;
        this.articleStore = articleStore;
        this.concurrencyLimit = new Semaphore(maxConcurrentFeeds);
    }

    public IngestionResult ingestAll(List<String> feedUrls)
            throws InterruptedException {
        Instant deadline = Instant.now().plusSeconds(120);
        List<FeedResult> results = new ArrayList<>();
        List<String> failures = new ArrayList<>();

        try (var scope = new StructuredTaskScope<FeedResult>()) {
            for (String url : feedUrls) {
                scope.fork(() -> ingestFeed(url));
            }

            scope.joinUntil(deadline);

            // Process results from completed tasks
            // (timed-out tasks will be in UNAVAILABLE state)
        } catch (TimeoutException e) {
            logger.warn("Ingestion timed out, processing partial results");
        }

        return new IngestionResult(results, failures);
    }

    private FeedResult ingestFeed(String feedUrl) throws Exception {
        concurrencyLimit.acquire(); // Rate limit concurrent connections
        try {
            List<RawArticle> rawArticles = feedParser.parse(feedUrl);

            // Nested fan-out: enrich all articles from this feed
            try (var enrichScope =
                     new StructuredTaskScope.ShutdownOnFailure()) {

                List<StructuredTaskScope.Subtask<Article>> enrichTasks =
                    rawArticles.stream()
                        .map(raw -> enrichScope.fork(
                            () -> enricher.enrich(raw)))
                        .toList();

                enrichScope.join().throwIfFailed();

                List<Article> enriched = enrichTasks.stream()
                    .map(StructuredTaskScope.Subtask::get)
                    .toList();

                articleStore.saveAll(enriched);
                return new FeedResult(feedUrl, enriched.size(), List.of());
            }
        } finally {
            concurrencyLimit.release();
        }
    }
}

This is nested structured concurrency. The outer scope manages per-feed ingestion. Each feed’s inner scope manages per-article enrichment. If the outer scope times out, all inner scopes are cancelled. If an inner scope fails, only that feed’s enrichment is aborted.

The Semaphore limits concurrent connections to avoid overwhelming upstream servers. Semaphore.acquire() is virtual-thread-friendly: the virtual thread unmounts while waiting for a permit.

Structured Concurrency vs CompletableFuture

ConcernCompletableFutureStructuredTaskScope
Cancellation on failureManual, unreliableAutomatic, guaranteed
Thread lifecycleUnstructured, can leakBound to scope, cannot leak
TimeoutPer-future with .get(timeout)Per-scope with joinUntil
Error propagationChained with .exceptionallyChecked exception from throwIfFailed
Partial resultsComplex with .allOf + checksInspect individual Subtask.state()
NestingFlat composition onlyHierarchical, deadlines inherit
DebuggingThread dump shows detached tasksThread dump shows parent-child tree
Virtual thread supportWorks but lifecycle unmanagedDesigned for virtual threads

CompletableFuture remains the right choice for purely asynchronous pipelines where you chain transformations without waiting. For fan-out-and-gather patterns, where you launch N tasks and need results from all (or some) of them before proceeding, StructuredTaskScope is both safer and more readable.

The content platform migrated from CompletableFuture to StructuredTaskScope for the recommendation engine, the search fallback chain, and the content ingestion pipeline. The migration eliminated three classes of bugs: leaked threads from abandoned futures, missing timeout handling on individual futures, and partial cancellation failures where 2 of 3 tasks continued running after the first failed.