Chapters

Hide chapters

Kotlin Coroutines by Tutorials

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 3.5

Section I: Introduction to Coroutines

Section 1: 9 chapters
Show chapters Hide chapters

11. Channels
Written by Nishant Srivastava

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

From the previous chapters, you already learned how to deal with sending a request for receiving a single value. This approach works perfectly when you just need to get a value once and show it to the user, e.g., fetching a user profile or downloading an image. In this chapter, you will learn how to send and receive streams of values.

Streams are convenient when you need to continuously get updates of data or handle a potentially infinite sequence of items. Kotlin isn’t the first one to offer a solution to these problems. Observable from ReactiveX and Queue from Java solve them as well. How Channels compare with Observable and Queue, as well as their benefits and disadvantages, will be covered further in this book.

Note: A stream is a source or repository of data that can be read or written only sequentially while a thread is a unit of execution, lighter in weight than a process, generally expected to share memory and other resources with other threads executed concurrently.

Getting started with channels

Channels are conceptually similar to reactive streams. It is a simple abstraction that you can use to transfer a stream of values between coroutines. Consider a source that sends content to a destination that receives it; i.e., elements are sent into the channel by producer coroutines and are received by consumer coroutines. Essentially, channels are like blocking queues that send and operate on data asynchronously.

Channel
Channel

A fundamental property — and an important concept to understand — of a channel is its capacity, which defines the maximum number of elements that a channel can contain in a buffer. Suppose you have a channel with capacity N. A producer can send values into the channel but, when the channel reaches its N capacity, the producer suspends until a consumer starts to read data from the same channel. You can think of the capacity like the size of the buffer for a specific channel; it’s a way to optimize performances in the case producing and consuming are operations, which take different amounts of time.

You can change the default capacity of a channel by passing it as an argument to its factory method. Take a look at the following method signature:

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>

You will notice that the default capacity is set to RENDEZVOUS, which corresponds to 0 as per the source code:

public const val RENDEZVOUS = 0

What does it mean in practice? It means that the producer channel won’t produce anything until there is a consumer channel that needs data; essentially, there is no buffer.

An element is transferred from producer to consumer only when the producer’s send and consumer’s receive invocations meet in time (rendezvous). Because of this, the send function suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send. This is the reason for the RENDEZVOUS name.

Note: The same happens in Java with the SynchronousQueue class.

Creating a channel is pretty straightforward. Write the following:

val kotlinChannel = Channel<Int>()

Consuming its values can be done via the usual for loop:

for (x in kotlinChannel){
  println(x)
}

Channels implement the SendChannel and ReceiveChannel interfaces.

public interface SendChannel<in E> {
  @ExperimentalCoroutinesApi
  public val isClosedForSend: Boolean

  public suspend fun send(element: E)
  public fun offer(element: E)
  public fun close(cause: Throwable? = null): Boolean
  ...
}

public interface ReceiveChannel<out E> {
  @ExperimentalCoroutinesApi
  public val isClosedForReceive: Boolean

  public suspend fun receive(): E
  public fun cancel(): Unit
  ...
}

Notice that SendChannel exposes the operation close, which is used — surprise — for closing the channel. As soon as the sender calls close() on the channel, the value of isClosedForSend becomes true.

Note: close() is an idempotent operation; repeated invocations of this function have no effect and return false.

You can’t send any message into a closed channel. Closing a channel conceptually works by sending a special close token over it. You close a channel when you have a finite sequence of elements to be processed by consumers. You must then signal to the consumers that this sequence is over. The iteration stops as soon as this close token is received, so there is a guarantee that all previously sent elements before the close are received. You don’t have to close a channel otherwise.

On the other hand, ReceiveChannel exposes the cancel operation, which cancels the reception of remaining elements from the channel. Once finished, this function closes the channel and removes all messages in the buffer, if any. After cancel() completes, isClosedForReceive starts returning true. If the producer has already closed the channel invoking the close() function, then isClosedForReceive returns true only after all previously sent elements are received.

The isClosedForReceive property can be used along with channel.receive() to iterate and get items from a channel one at a time:

while (!kotlinChannel.isClosedForReceive) {
    val value = kotlinChannel.receive()
    println(value)
}

Channels are not tied to any native resource and they don’t have to be closed to release their memory; hence, simply dropping all the references to a channel is fine. When the garbage collector runs, it will clean out those references.

Other important methods are send and receive. You can send items to the channel with the method send(element: E) and receive from it with receive():E.

This is typical usage for a channel:

fun main() {

  // 1
  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
      "Strawberry")

  // 2
  val kotlinChannel = Channel<String>()

  runBlocking {

    // 3
    GlobalScope.launch {
      for (fruit in fruitArray) {
        // 4
        kotlinChannel.send(fruit)

        // 5
        if (fruit == "Pear") {
          // 6
          kotlinChannel.close()
        }
      }
    }

    // 7 
    for (fruit in kotlinChannel) {
      println(fruit)
    }

    // 8
    println("Done!")
  }
}

Output:

Apple
Banana
Pear
Done!

Breaking down each part of the above code snippet, which you can find the executable version of the above snippet of code in the starter project in the file called ChannelsIntro.kt:

  1. An array of string items.
  2. Create a channel with default — i.e., 0 capacity.
  3. Set up the producer.
  4. Send data in the channel.
  5. Conditional check, if the current item is equal to value Pear.
  6. Signal the closure of the channel via calling close() on the channel.
  7. Set up the consumer that is printing the received values using for loop (until the channel is closed).
  8. Print the final Done status.

In the previous example, you create a Channel of String objects. Then, into the body of the launch coroutine builder, you iterate over an Array<String> and put each element into the channel using the send function. While iterating, you check if the current value equals to Pear, in which case you close the channel invoking the close method. This is an example of a condition for the closing of the channel.

On the receiving side, you use a normal iteration with a for cycle in order to consume all the elements available in the channel. The for cycle is smart enough to understand when the channel is closed because it uses the underlying Iterator.

The for loop solution is excellent because it allows you to use channels in the normal pattern that you’d use for iterating over a normal collection. If you want more control over what you’re doing, you can consume the channel using code like this:

while (!kotlinChannel.isClosedForReceive) {
    val value = kotlinChannel.receive()
    println(value)
}

However, there is yet another way to iterate over the channel values, via using repeat() Kotlin construct:

// Another way to iterate over the channel values
// You use channel.receive() to
// get the messages one by one
repeat(3){
  val fruit = kotlinChannel.receive()
  println(fruit)
}

Here, you explicitly use the receive method but you have to know exactly how many elements you’re getting from the channel, which is not always possible. If you try to put 4 instead of 3 as argument of the repeat function, you’ll have a ClosedReceiveChannelException exception like this:

Apple
Banana
Pear
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
	at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1070)

It’s interesting to note that the exception is not thrown on the receive function but on the close one. This happens because the close function is a suspend function, which actually completes only when the receiver consumes all the items in the channel. If the receiver requests more items that the one available, the producer tries to provide some new data. But, in this, case the channel is closed and this is not possible. This is the reason for the ClosedReceiveChannelException. In the case that you put a value smaller than the number of available objects, on the other hand, you’re going to miss some data.

Understanding closed channels

In order to understand the state of the channel, you can use two handy properties: isClosedForReceive and isClosedForSend.

while (!kotlinChannel.isClosedForReceive) {
  val fruit = kotlinChannel.receive()
  println(fruit)
}
while (!kotlinChannel.isClosedForReceive) {
  val fruit = kotlinChannel.receive()
  delay(10)
  println(fruit)
}
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")

  fun produceFruits() = GlobalScope.produce<String> {
    for (fruit in fruitArray) {
      send(fruit)

      // Conditional close
      if (fruit == "Pear") {
        // Signal that closure of channel
        close()
      }
    }
  }

  runBlocking {
    val fruits = produceFruits()
    fruits.consumeEach { println(it) }
    println("Done!")
  }
}

Pipelines

With channels, you always have a producer and a consumer. Sometimes, a consumer receives the data from a channel, applies some transformations and becomes the producer of a new channel. When a consumer of a channel becomes the producer of another channel, you create a Pipelines. The source channel might be infinite and the pipeline might contain different steps.

Pipeline
Huqexide

data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item

@ExperimentalCoroutinesApi
fun main() {


  // ------------ Helper Methods ------------
  fun isFruit(item: Item): Boolean = item is Fruit

  fun isRed(item: Item): Boolean = (item.color == "Red")


  // ------------ Pipeline ------------
  // 1
  fun produceItems() = GlobalScope.produce {
    val itemsArray = ArrayList<Item>()
    itemsArray.add(Fruit("Apple", "Red"))
    itemsArray.add(Vegetable("Zucchini", "Green"))
    itemsArray.add(Fruit("Grapes", "Green"))
    itemsArray.add(Vegetable("Radishes", "Red"))
    itemsArray.add(Fruit("Banana", "Yellow"))
    itemsArray.add(Fruit("Cherries", "Red"))
    itemsArray.add(Vegetable("Broccoli ", "Green"))
    itemsArray.add(Fruit("Strawberry", "Red"))

    // Send each item in the channel
    itemsArray.forEach {
      send(it)
    }
  }

  // 2
  fun isFruit(items: ReceiveChannel<Item>) = GlobalScope.produce {
    for (item in items) {
      // Send each item in the channel only if it is a fruit
      if (isFruit(item)) {
        send(item)
      }
    }
  }

  // 3
  fun isRed(items: ReceiveChannel<Item>) = GlobalScope.produce {
    for (item in items) {
      // Send each item in the channel only if it is red in color
      if (isRed(item)) {
        send(item)
      }
    }
  }

  runBlocking {
    // 4
    val itemsChannel = produceItems()
    // 5
    val fruitsChannel = isFruit(itemsChannel)
    // 6
    val redChannel = isRed(fruitsChannel)

    // 7
    for (item in redChannel) {
      print("${item.name}, ")
    }

    // 8
    redChannel.cancel()
    fruitsChannel.cancel()
    itemsChannel.cancel()

    // 9
    println("Done!")
  }
}
interface Item {
  val name: String
  val color: String
}
Apple, Cherries, Strawberry, Done!

Fan out

In the previous example, you created a pipeline as a sequence of channels, each one with a single producer and a single consumer. Coroutines were consuming the data from a channel and testing if that data satisfied certain conditions. In the case of success, the items were put into the new channel; otherwise, they were discarded.

Fan-out
Yep-aom

typealias Predicate<E> = (E) -> Boolean

typealias Rule<E> = Pair<Channel<E>, Predicate<E>>
class Demultiplexer<E>(vararg val rules: Rule<E>) {

  suspend fun consume(recv: ReceiveChannel<E>) {
    for (item in recv) {
      // 1
      for (rule in rules) {
        // 2            
        if (rule.second(item)) {
          // 3
          rule.first.send(item)
        }
      }
    }
    // 4
    closeAll()
  }

  // Closes all the demultiplexed channels
  private fun closeAll() {
    rules.forEach { it.first.close() }
  }
}
@ExperimentalCoroutinesApi
fun main() {

  data class Fruit(override val name: String, override val color: String) : Item
  data class Vegetable(override val name: String, override val color: String) : Item

  // ------------ Helper Methods ------------
  fun isFruit(item: Item) = item is Fruit

  fun isVegetable(item: Item) = item is Vegetable

  // 1
  fun produceItems(): ArrayList<Item> {
    val itemsArray = ArrayList<Item>()
    itemsArray.add(Fruit("Apple", "Red"))
    itemsArray.add(Vegetable("Zucchini", "Green"))
    itemsArray.add(Fruit("Grapes", "Green"))
    itemsArray.add(Vegetable("Radishes", "Red"))
    itemsArray.add(Fruit("Banana", "Yellow"))
    itemsArray.add(Fruit("Cherries", "Red"))
    itemsArray.add(Vegetable("Broccoli", "Green"))
    itemsArray.add(Fruit("Strawberry", "Red"))
    itemsArray.add(Vegetable("Red bell pepper", "Red"))
    return itemsArray
  }

  runBlocking {

    // 2
    val kotlinChannel = Channel<Item>()

    // 3
    val fruitsChannel = Channel<Item>()
    val vegetablesChannel = Channel<Item>()

    // 4
    launch {
      produceItems().forEach {
        kotlinChannel.send(it)
      }
      // 5
      kotlinChannel.close()
    }

    // 6
    val typeDemultiplexer = Demultiplexer(
        fruitsChannel to { item: Item -> isFruit(item) },
        vegetablesChannel to { item: Item -> isVegetable(item) }
    )

    // 7
    launch {
      typeDemultiplexer.consume(kotlinChannel)
    }

    // 8
    launch {
      for (item in fruitsChannel) {
        // Consume fruitsChannel
        println("${item.name} is a fruit")
      }
    }

    // 9
    launch {
      for (item in vegetablesChannel) {
        // Consume vegetablesChannel
        println("${item.name}  is a vegetable")
      }
    }
  }
}
Apple is a fruit
Zucchini  is a vegetable
Grapes is a fruit
Radishes  is a vegetable
Banana is a fruit
Cherries is a fruit
Broccoli  is a vegetable
Strawberry is a fruit
Red bell pepper  is a vegetable

Fan in

In the previous example, you created a coroutine that was able to demultiplex the items into different channels based on certain criteria. That was a way to simulate the case in which you have one producer and many consumers.

Fan-in
Wup-ej

@ExperimentalCoroutinesApi
fun main() {

  data class Fruit(override val name: String, override val color: String) : Item
  data class Vegetable(override val name: String, override val color: String) : Item

  // ------------ Helper Methods ------------
  fun isFruit(item: Item) = item is Fruit

  fun isVegetable(item: Item) = item is Vegetable


  // 1
  fun produceItems(): ArrayList<Item> {
    val itemsArray = ArrayList<Item>()
    itemsArray.add(Fruit("Apple", "Red"))
    itemsArray.add(Vegetable("Zucchini", "Green"))
    itemsArray.add(Fruit("Grapes", "Green"))
    itemsArray.add(Vegetable("Radishes", "Red"))
    itemsArray.add(Fruit("Banana", "Yellow"))
    itemsArray.add(Fruit("Cherries", "Red"))
    itemsArray.add(Vegetable("Broccoli", "Green"))
    itemsArray.add(Fruit("Strawberry", "Red"))
    itemsArray.add(Vegetable("Red bell pepper", "Red"))
    return itemsArray
  }


  runBlocking {

    // 2
    val destinationChannel = Channel<Item>()

    // 3
    val fruitsChannel = Channel<Item>()
    val vegetablesChannel = Channel<Item>()

    // 4
    launch {
      produceItems().forEach {
        if (isFruit(it)) {
          fruitsChannel.send(it)
        }
      }
    }

    // 5
    launch {
      produceItems().forEach {
        if (isVegetable(it)) {
          vegetablesChannel.send(it)
        }
      }
    }


    // 6
    launch {
      for (item in fruitsChannel) {
        destinationChannel.send(item)
      }
    }

    // 7
    launch {
      for (item in vegetablesChannel) {
        destinationChannel.send(item)
      }
    }

    // 8
    destinationChannel.consumeEach {
      if (isFruit(it)) {
        println("${it.name} is a fruit")
      } else if (isVegetable(it)) {
        println("${it.name} is a vegetable")
      }
    }


    // 9
    coroutineContext.cancelChildren()
  }
}
Apple is a fruit
Zucchini is a vegetable
Grapes is a fruit
Banana is a fruit
Radishes is a vegetable
Cherries is a fruit
Broccoli is a vegetable
Strawberry is a fruit
Red bell pepper is a vegetable

Buffered channel

As you might have noticed above, the channel examples demonstrated previously used a default value for the capacity, called RENDEZVOUS. These kinds of channels are called unbuffered channels because the producer produces only if there’s a consumer ready to consume.

// Channel of capacity 2
val kotlinBufferedChannel = Channel<String>(2) 
fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")

  val kotlinBufferedChannel = Channel<String>(2)

  runBlocking {
    launch {
      for (fruit in fruitArray) {
        kotlinBufferedChannel.send(fruit)
        println("Produced: $fruit")        
      }
      kotlinBufferedChannel.close()
    }

    launch {
      for (fruit in kotlinBufferedChannel) {
        println("Consumed: $fruit")
        delay(1000)
      }
    }
  }
}
Produced: Apple
Produced: Banana
Consumed: Apple
Produced: Pear
Consumed: Banana
Produced: Grapes
Consumed: Pear
Produced: Strawberry
Consumed: Grapes
Consumed: Strawberry

Comparing send and offer

In the previous examples, you sent values into a channel using the send function. Depending on the channel’s capacity, send is a function that can suspend. This is happening when the channel’s buffer is full or, in case of RENDEZVOUS, when there’s not receiver ready to consume.

abstract fun offer(element: E): Boolean
fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")

  val kotlinChannel = Channel<String>()

  runBlocking {
    launch {
      for (fruit in fruitArray) {
        val wasSent = kotlinChannel.offer(fruit)
        if (wasSent) {
          println("Sent: $fruit")
        } else {
          println("$fruit wasn’t sent")
        }
      }
      kotlinChannel.close()
    }

    for (fruit in kotlinChannel) {
      println("Received: $fruit")
    }
    println("Done!")
  }
}
Sent: Apple
Banana wasn’t sent
Pear wasn’t sent
Grapes wasn’t sent
Strawberry wasn’t sent
Received: Apple

Comparing receive and poll

In the previous section, you’ve seen that a producer can use offer as a not suspending version of the send function. What about the consumer? In this case, the version of receive without suspending is the poll function whose signature is:

abstract fun poll(): E?
fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")

  val kotlinChannel = Channel<String>()

  runBlocking {

    launch {
      for (fruit in fruitArray) {
        if (fruit == "Pear") {
          break
        }
        kotlinChannel.send(fruit)
        println("Sent: $fruit")
      }
    }

    launch {
      repeat(fruitArray.size) {
        val fruit = kotlinChannel.poll()
        if (fruit != null) {
          println("Received: $fruit")
        } else {
          println("Channel is empty")
        }

        delay(500)
      }

      println("Done!")
    }
  }
}
Received: Apple
Sent: Apple
Received: Banana
Sent: Banana
Channel is empty
Channel is empty
Channel is empty
Done!

Error handling

As you have seen in the previous examples, exceptions play an important role in the way you can use a channel. It’s crucial to understand what the main exceptions are and what you should do when they happen. You have to consider two main use cases, depending on if you’re on the producer side or on the consumer side of the channel.

fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")

  val kotlinChannel = Channel<String>()

  runBlocking {
    launch {
      for (fruit in fruitArray) {
        // Conditional close
        if (fruit == "Grapes") {
          // Signal that closure of channel
          kotlinChannel.close()
        }

        kotlinChannel.send(fruit)
      }
    }

    repeat(fruitArray.size) {
      try {
        val fruit = kotlinChannel.receive()
        println(fruit)
      } catch (e: Exception) {
        println("Exception raised: ${e.javaClass.simpleName}")
      }
    }
    println("Done!")
  }
}
Apple
Banana
Pear
Exception raised: ClosedReceiveChannelException
Exception raised: ClosedReceiveChannelException
Done!
fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")

  val kotlinChannel = Channel<String>()

  runBlocking {
    launch {
      for (fruit in fruitArray) {
        try {
          kotlinChannel.send(fruit)
        } catch (e: Exception) {
          println("Exception raised: ${e.javaClass.simpleName}")
        }
      }

      println("Done!")
    }

    repeat(fruitArray.size - 1) {
      val fruit = kotlinChannel.receive()
      // Conditional close
      if (fruit == "Grapes") {
        // Signal that closure of channel
        kotlinChannel.close()
      }
      println(fruit)
    }
  }
}
Apple
Banana
Pear
Grapes
Exception raised: ClosedSendChannelException
Done!

Comparing Channels to Java Queues

As mentioned, Java offers a similar solution for handling streams, called Queue<E>, which is an interface and has several implementations. Take a look at an implementation of the BlockingQueue<E> interface, as it supports a similar behavior as Channel of waiting until a queue has space before inserting an element.

public class BlockingQueueExample {

  public static void main(String[] args) {

    BlockingQueue<String> queue = new LinkedBlockingQueue<>();

    System.out.println("Beginning:");
    try {
      System.out.println("Let’s put in basket: Apple");
      queue.put("Apple");
      System.out.println("Let’s put in basket: Banana");
      queue.put("Banana");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    System.out.println("Done!");
  }
}
Beginning:
Let’s put in basket: Apple
Let’s put in basket: Banana
Done!
fun main(args: Array<String>) {
  // 1
  val queue = LinkedBlockingQueue<Int>()
  runBlocking {

    // 2
    launch {
      (1..5).forEach {
        queue.put(it)
        yield()
        println("Produced ${it}")
      }
    }

    // 3
    launch {
      while (true) {
        println("Consumed ${queue.take()}")
        yield()
      }
    }

    println("Done!")
  }
}
Consumed 1
Produced 1
Consumed 2
Produced 2
Consumed 3
Produced 3
Consumed 4
Produced 4
Consumed 5
Produced 5

Key points

  1. Channels provide the functionality for sending and receiving streams of values.
  2. Channel implements both SendChannel and ReceiveChannel interfaces; therefore, it could be used for sending and receiving streams of values.
  3. A Channel can be closed. When that happens, you can’t send or receive an element from it.
  4. The send() method either adds the value to a channel or suspends the coroutine until there is space in the channel.
  5. The receive() method returns a value from a channel if it is available, or it suspends the coroutine until some value is available otherwise.
  6. The offer() method can be used as an alternative to send(). Unlike the send() method, offer() doesn’t suspend the coroutine, it returns false instead. It returns true in case of a successful operation.
  7. poll() similarly to offer() doesn’t suspend the running, but returns null if a channel is empty.
  8. Java BlockingQueue has a similar to Kotlin Channel behavior, the main difference is that the current thread gets blocked if the operation of inserting or retrieving is unavailable at the moment.
Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now