Skip to main content
kotlin in depth advanced patterns for java engineers

Reactive Streams with Flows and Channels

5 min read Chapter 19 of 21

The Reactive Landscape You’re Leaving Behind

If you’ve worked in the Java ecosystem for any meaningful length of time, you’ve almost certainly encountered reactive streams. RxJava arrived first, bringing the Observer pattern and a staggering API surface — over 400 operators by RxJava 2. Then Project Reactor came along, tightly integrated with Spring WebFlux. Java 9 added its own Flow API (the java.util.concurrent.Flow interfaces), which was an attempt to standardize the reactive-streams specification into the JDK itself.

All of these share a common DNA: the Reactive Streams specification with its Publisher, Subscriber, Subscription, and Processor interfaces. And all of them share common pain points that you’ve likely felt:

// RxJava: Where does this execute? Where do results land?
Observable.fromCallable(() -> fetchFromNetwork())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(data -> Observable.fromIterable(data.getItems()))
    .filter(item -> item.isActive())
    .subscribe(
        item -> updateUI(item),
        error -> handleError(error),  // Did you remember this?
        () -> onComplete()
    );
// Don't forget to dispose, or you leak

The pain is real: explicit thread scheduling with subscribeOn/observeOn, manual lifecycle management through Disposable, error channels that silently swallow exceptions if you forget the error callback, and a learning curve that makes senior developers feel like juniors again.

Kotlin Flows took a different path. Instead of building yet another reactive framework, the Kotlin team asked: what if structured concurrency handled the hard parts?

Cold Streams: From Sequences to Flows

You already understand lazy evaluation from CH4. A Sequence<T> evaluates elements on demand, one at a time, without creating intermediate collections:

val result = sequenceOf(1, 2, 3, 4, 5)
    .filter { it % 2 == 0 }
    .map { it * it }
    .toList() // evaluation happens here

A Flow<T> is the asynchronous counterpart. Where a Sequence builds on Iterator (with its blocking next() call), a Flow builds on suspend functions. This is the critical difference — a Flow can suspend between emissions without blocking a thread.

fun numbersFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100) // suspends, doesn't block
        emit(i)
    }
}

The word “cold” means the same thing it did with Sequences: nothing executes until a terminal operator is called. Each call to collect starts the flow from scratch, just like each call to toList() iterates a Sequence from the beginning.

val myFlow = numbersFlow()
// Nothing has happened yet. No coroutine launched, no delay started.

myFlow.collect { value -> println(value) } // Execution starts here
myFlow.collect { value -> println(value) } // Runs again, independently

This is a fundamental departure from RxJava, where creating a Subject or a ConnectableObservable immediately starts producing values. In Kotlin, you get cold-by-default, hot-by-choice.

Flow Types Comparison

The Flow API at a Glance

A Flow is defined by a single interface:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

Two interfaces. Two methods. Compare that with RxJava’s Observable, Single, Maybe, Completable, and Flowable — five types before you’ve written a line of business logic.

Building Flows

// From a builder (most common)
val flow1 = flow {
    emit(1)
    emit(2)
    emit(3)
}

// From fixed values
val flow2 = flowOf(1, 2, 3)

// From existing collections
val flow3 = listOf(1, 2, 3).asFlow()

// From a suspend function (single value)
val flow4 = flow { emit(fetchUser(id)) }

Terminal Operators

Terminal operators are suspend functions that trigger flow collection:

val numbers = flowOf(1, 2, 3, 4, 5)

numbers.collect { println(it) }        // process each element
val list = numbers.toList()             // collect into list
val first = numbers.first()             // first element, then cancel
val single = numbers.single()           // exactly one element, or throw
val count = numbers.count()             // count elements
val reduced = numbers.reduce { a, b -> a + b } // fold without initial value

Context and Threading: flowOn Replaces Everything

Here’s where the RxJava contrast sharpens. In RxJava, you juggle subscribeOn (affects upstream), observeOn (affects downstream), and you need to reason about which one wins if both are present. In Kotlin:

val userData = flow {
    // This runs on Dispatchers.IO
    val user = fetchUserFromDb()
    emit(user)
}
.flowOn(Dispatchers.IO) // Changes the upstream context
.map { user ->
    // This runs on the collector's context (e.g., Main)
    formatForDisplay(user)
}

flowOn changes the context of everything upstream of it. The collector’s context stays unchanged. There’s no observeOn. If you collected this flow from Dispatchers.Main, the map runs on Main, and the flow { } body runs on IO. One operator, one rule, no ambiguity.

No Disposables, No Leaks

In RxJava:

Disposable d = observable.subscribe(item -> process(item));
// You must call d.dispose() or use CompositeDisposable
// Forget this, and you leak memory and possibly the subscription

In Kotlin, collect is a suspend function. It respects structured concurrency. When the coroutine scope cancels, collection stops. No cleanup code, no onCleared() callbacks, no CompositeDisposable:

viewModelScope.launch {
    userFlow.collect { user ->
        _uiState.value = user
    }
    // When viewModelScope cancels, collection stops automatically
}

What’s Ahead

This chapter splits into two sections. The first covers the full spectrum of flow types — from cold Flow through hot SharedFlow and StateFlow — along with the operators you’ll use daily: transformation, error handling, and the critical shareIn/stateIn conversion functions. The second section moves to Channel, the lower-level primitive for direct coroutine-to-coroutine communication, and builds up to the Actor pattern for lock-free state management.

By the end, you’ll have a clear mental model for choosing the right tool: Flow for data streams, SharedFlow for events, StateFlow for state, and Channel for coordination.