Chapters

Hide chapters

Kotlin Coroutines by Tutorials

Third Edition · Android 12 · Kotlin 1.6 · Android Studio Bumblebee

Section I: Introduction to Coroutines

Section 1: 9 chapters
Show chapters Hide chapters

11. Beginning With Coroutine Flow
Written by Filip Babić

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Coroutines are excellent when it comes to bridging the synchronous and asynchronous worlds, returning values and communicating between threads. Usually, that’s what you want and need. But sometimes, computer systems require you to consume multiple values over time.

And there are two ways you can do this: using sequences and streams. However, there are certain limitations to both approaches. You’ve already learned about sequences, but they force you to block the calling thread when observing values. So see what streams have to offer and how they behave in code.

Streams of Data

A key similarity between sequences and streams is both constructs can generate infinite elements. Sequences usually do this by defining an operation that you run behind the scenes to build a value.

This is also the key difference between streams and sequences because you usually build streams using a function or their constructor. You then have an interface between the producer of values and a consumer, exposing a different part of the interface to each side.

Take this snippet, for example, which uses the Reactive Extensions version of observable streams of data:

val subject = BehaviorSubject.create<String>()

subject.subscribe(observer)
subject.onNext("one")
subject.onNext("two")
subject.onNext("three")

You create a Subject, which implements both sides of the stream interface. The provider can use functions such as offer, onNext, and send to fill the queue for the stream with values to consume. In this case, it uses onNext from Rx.

Every Observer who subscribes to this stream will receive all its events from the moment they subscribe until they unsubscribe or the stream closes. The observer in Rx looks like this:

val observer = object: Observer<String> {
  
  override fun onNext(value: String) {
    // consume the value
  }
  
  override fun onError(throwable: Throwable) {
    // handle the error
  }
  
  override fun onComplete() {
    // the stream completed
  }
}

When you send any of the events to the Subject’s Observable side, the Subject sends all of them to all its Observers. It acts as a data relay from a central point to multiple observing nodes. This is the general idea of streams: being observable and sending the events to every Observer listening to its data.

But, depending on the implementation of streams, you might have a different setup. Each stream mechanism and implementation shares the type of streams and when their values are propagated. As such, there are hot and cold streams of data. Let’s consume them one at a time.

Hot Streams

Hot streams behave like TV channels or radio stations. They keep sending events and emitting their data even though no one may be listening or watching the show. It’s why they’re called hot. Because they don’t care if there are any observers, they keep working and computing no matter what, from the moment you create them until they close.

Cold Streams

It makes sense that if hot streams are computed right away and work even without observers, cold streams do the opposite. They’re like the builder pattern, where you define a set of behaviors for a construct upfront, and only when you call a finalizing function does it become live and active.

Limitations of Streams

In everyday programming, there are certain limitations to the way things should operate for optimal use. You don’t want to waste resources, freeze the UI, lose data, etc. Some of these concepts apply to streams, as well. These limitations revolve around the same problem: the speed of producing and consuming the values.

Supporting Backpressure

As you’ve learned, if one side of the producer-consumer pair is too fast or slow, you lose data or block the non-bottlenecked side unless you add backpressure support.

A New Approach to Streams

Having the best of both worlds, the Flow API supports cold, asynchronously-built value streams, where the thread communication and backpressure support are implemented through coroutines. It’s the perfect combination.

Building Flows

To create a Flow, just like with standard coroutines, you have to use a builder. But first, open Main.kt in the starter project. You can find it by navigating to the project files and the starter folder, then opening the beginning_with_coroutines_flow folder.

val flowOfStrings = flow {
  for (number in 0..100) {
    emit("Emitting: $number")
  }
}
GlobalScope.launch {
  flowOfStrings.collect { value ->
    println(value)
  }
}

Thread.sleep(1000)
Emitting: 0
Emitting: 1
Emitting: 2
Emitting: 3
....
Emitting: 99
Emitting: 100
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> 
  = SafeFlow(block)

Collecting and Transforming Values

Once you build a Flow, you can do many things with the stream before the values reach the FlowCollector. Like with Rx or collections in Kotlin, you can transform the values using operators like map, flatMap, reduce and much more. Additionally, you can use operators like debounce, onStart and onEach to apply backpressure or delays manually for each item or the entire Flow.
Take the following snippet, for example:

GlobalScope.launch {
  flowOfStrings
      .map { it.split(" ") }
      .map { it.last() }
      .onEach { delay(100) }
      .collect { value ->
        println(value)
      }
}

Switching the Context

Another thing you can do with Flow events is switch the context in which you’ll consume them. To do that, you have to call flowOn(context: CoroutineContext) like this:

GlobalScope.launch {
  flowOfStrings
    .map { it.split(" ") }
    .map { it.last() }
    .flowOn(Dispatchers.IO)
    .onEach { delay(100) }
    .flowOn(Dispatchers.Default)
    .collect { value ->
      println(value)
    }
}
/**
 * This operator retains a sequential nature of flow if changing the context does 
 * not call for changing the dispatcher. Otherwise, if changing dispatcher is required,
 * it collects flow emissions in one coroutine that's run using a specified context 
 * and emits them from another coroutines with the original collector's context...
 **/
flow.map { ... } // Will use Dispatchers.IO in the end
    .flowOn(Dispatchers.IO) // The first operator's context takes precedence
    .flowOn(Dispatchers.Default + customContext) // Contexts are merged with upstream

Flow Constraints

Because Flow is easy to use, there have to be some constraints to keep people from abusing or breaking the API. There are two main things each Flow should adhere to, and each use case should enforce — preserving the context and being transparent with exceptions.

Preserving the Flow Context

As mentioned above, you have to be clean when using CoroutineContexts with the Flow API. The producing and consuming contexts have to be the same. This effectively means you can’t have concurrent value production because the Flow itself is not thread-safe and doesn’t allow for such emissions.

val flowOfStrings = flow {
  for (number in 0..100) {

    GlobalScope.launch {
      emit("Emitting: $number")
    }
  }
}

GlobalScope.launch {
  flowOfStrings.collect { println(it) }
}
val flowOfStrings = channelFlow {
  for (number in 0..100) {

    withContext(Dispatchers.IO) {
      trySend("Emitting: $number")
    }
  }
}

GlobalScope.launch {
  flowOfStrings.collect { println(it) }
}

Being Transparent With Exceptions

It’s relatively easy to bury exceptions in coroutines. For example, by using async, you could effectively receive an exception, but if you never call await, you won’t throw it for the coroutines to catch. Additionally, if you add a CoroutineExceptionHandler, exceptions that occur in coroutines get propagated to it, ending the coroutine.

flowOfStrings
  .map { it.split(" ") }
  .map { it[1] }
  .catch { it.printStackTrace() }
  .flowOn(Dispatchers.Default)
  .collect { println(it) }
val flowOfStrings = flow {
  emit("")

  for (number in 0..100) {
    emit("Emitting: $number")
  }
}
println("The code still works!")
flowOfStrings
.map { it.split(" ") }
.map { it[1] }
.catch {
  it.printStackTrace()
  // send the fallback value or values
  emit("Fallback")
}
.flowOn(Dispatchers.Default)
.collect { println(it) }

println("The code still works!")

Key Points

Where to Go From Here?

This chapter introduced you to the fundamentals of the Kotlin Coroutines Flow API. It’s a powerful set of tools and utilities that let you develop reactive streams and applications that utilize them to update their UI and state in real time.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now