From the previous chapters, you already learned how to deal with sending a request for receiving a single value. This approach works perfectly when you just need to get a value once and show it to the user, e.g., fetching a user profile or downloading an image. In this chapter, you will learn how to send and receive streams of values.
Streams are convenient when you need to continuously get updates of data or handle a potentially infinite sequence of items. Kotlin isn’t the first one to offer a solution to these problems. Observable from ReactiveX and Queue from Java solve them as well. How Channels compare with Observable and Queue, as well as their benefits and disadvantages, will be covered further in this book.
Note: A stream is a source or repository of data that can be read or written only sequentially while a thread is a unit of execution, lighter in weight than a process, generally expected to share memory and other resources with other threads executed concurrently.
Getting started with channels
Channels are conceptually similar to reactive streams. It is a simple abstraction that you can use to transfer a stream of values between coroutines. Consider a source that sends content to a destination that receives it; i.e., elements are sent into the channel by producer coroutines and are received by consumer coroutines. Essentially, channels are like blocking queues that send and operate on data asynchronously.
A fundamental property — and an important concept to understand — of a channel is its capacity, which defines the maximum number of elements that a channel can contain in a buffer. Suppose you have a channel with capacity N. A producer can send values into the channel but, when the channel reaches its N capacity, the producer suspends until a consumer starts to read data from the same channel. You can think of the capacity like the size of the buffer for a specific channel; it’s a way to optimize performances in the case producing and consuming are operations, which take different amounts of time.
You can change the default capacity of a channel by passing it as an argument to its factory method. Take a look at the following method signature:
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>
You will notice that the default capacity is set to RENDEZVOUS, which corresponds to 0 as per the source code:
public const val RENDEZVOUS = 0
What does it mean in practice? It means that the producer channel won’t produce anything until there is a consumer channel that needs data; essentially, there is no buffer.
An element is transferred from producer to consumer only when the producer’s send and consumer’s receive invocations meet in time (rendezvous). Because of this, the send function suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send. This is the reason for the RENDEZVOUS name.
Note: The same happens in Java with the SynchronousQueue class.
Creating a channel is pretty straightforward. Write the following:
val kotlinChannel = Channel<Int>()
Consuming its values can be done via the usual for loop:
for (x in kotlinChannel){
println(x)
}
Channels implement the SendChannel and ReceiveChannel interfaces.
public interface SendChannel<in E> {
@ExperimentalCoroutinesApi
public val isClosedForSend: Boolean
public suspend fun send(element: E)
public fun offer(element: E)
public fun close(cause: Throwable? = null): Boolean
...
}
public interface ReceiveChannel<out E> {
@ExperimentalCoroutinesApi
public val isClosedForReceive: Boolean
public suspend fun receive(): E
public fun cancel(): Unit
...
}
Notice that SendChannel exposes the operation close, which is used — surprise — for closing the channel. As soon as the sender calls close() on the channel, the value of isClosedForSend becomes true.
Note: close() is an idempotent operation; repeated invocations of this function have no effect and return false.
You can’t send any message into a closed channel. Closing a channel conceptually works by sending a special close token over it. You close a channel when you have a finite sequence of elements to be processed by consumers. You must then signal to the consumers that this sequence is over. The iteration stops as soon as this close token is received, so there is a guarantee that all previously sent elements before the close are received. You don’t have to close a channel otherwise.
On the other hand, ReceiveChannel exposes the cancel operation, which cancels the reception of remaining elements from the channel. Once finished, this function closes the channel and removes all messages in the buffer, if any. After cancel() completes, isClosedForReceive starts returning true. If the producer has already closed the channel invoking the close() function, then isClosedForReceive returns true only after all previously sent elements are received.
The isClosedForReceive property can be used along with channel.receive() to iterate and get items from a channel one at a time:
while (!kotlinChannel.isClosedForReceive) {
val value = kotlinChannel.receive()
println(value)
}
Channels are not tied to any native resource and they don’t have to be closed to release their memory; hence, simply dropping all the references to a channel is fine. When the garbage collector runs, it will clean out those references.
Other important methods are send and receive. You can send items to the channel with the method send(element: E) and receive from it with receive():E.
This is typical usage for a channel:
fun main() {
// 1
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
"Strawberry")
// 2
val kotlinChannel = Channel<String>()
runBlocking {
// 3
GlobalScope.launch {
for (fruit in fruitArray) {
// 4
kotlinChannel.send(fruit)
// 5
if (fruit == "Pear") {
// 6
kotlinChannel.close()
}
}
}
// 7
for (fruit in kotlinChannel) {
println(fruit)
}
// 8
println("Done!")
}
}
Output:
Apple
Banana
Pear
Done!
Breaking down each part of the above code snippet, which you can find the executable version of the above snippet of code in the starter project in the file called ChannelsIntro.kt:
An array of string items.
Create a channel with default — i.e., 0 capacity.
Set up the producer.
Send data in the channel.
Conditional check, if the current item is equal to value Pear.
Signal the closure of the channel via calling close() on the channel.
Set up the consumer that is printing the received values using for loop (until the channel is closed).
Print the final Done status.
In the previous example, you create a Channel of String objects. Then, into the body of the launch coroutine builder, you iterate over an Array<String> and put each element into the channel using the send function. While iterating, you check if the current value equals to Pear, in which case you close the channel invoking the close method. This is an example of a condition for the closing of the channel.
On the receiving side, you use a normal iteration with a for cycle in order to consume all the elements available in the channel. The for cycle is smart enough to understand when the channel is closed because it uses the underlying Iterator.
The for loop solution is excellent because it allows you to use channels in the normal pattern that you’d use for iterating over a normal collection. If you want more control over what you’re doing, you can consume the channel using code like this:
while (!kotlinChannel.isClosedForReceive) {
val value = kotlinChannel.receive()
println(value)
}
However, there is yet another way to iterate over the channel values, via using repeat() Kotlin construct:
// Another way to iterate over the channel values
// You use channel.receive() to
// get the messages one by one
repeat(3){
val fruit = kotlinChannel.receive()
println(fruit)
}
Here, you explicitly use the receive method but you have to know exactly how many elements you’re getting from the channel, which is not always possible. If you try to put 4 instead of 3 as argument of the repeat function, you’ll have a ClosedReceiveChannelException exception like this:
Apple
Banana
Pear
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1070)
It’s interesting to note that the exception is not thrown on the receive function but on the close one. This happens because the close function is a suspend function, which actually completes only when the receiver consumes all the items in the channel. If the receiver requests more items that the one available, the producer tries to provide some new data. But, in this, case the channel is closed and this is not possible. This is the reason for the ClosedReceiveChannelException. In the case that you put a value smaller than the number of available objects, on the other hand, you’re going to miss some data.
Understanding closed channels
In order to understand the state of the channel, you can use two handy properties: isClosedForReceive and isClosedForSend.
Rqut zua psusu a rcokbek jbum swa bufjeg, wuqa ez bli dnedaaes emesssu, yeo awhpabulhf fuz inz abGjisidWujFusc pnitotbk la msii, ntajb jiufx qvab kae boh’x difg zux xosu. Ap’l alliklamm mi oksifqqohl jnez snaz piupc’x eswlq mkog epVlotimBonXivoopa iq iwxo zque. Yguv uq gineaze yjigo zjietw ccipz co bumu roke es lme wcaysux. Ldec hho xadeital ruflituc aby nnu goka, ppur mka idXjilelCapNitoini ew ifli dut he lkou. Ruceidi ir gvas, jia riy qazfatu ztu laya akawf lwi ceczihoqw yuni:
while (!kotlinChannel.isClosedForReceive) {
val fruit = kotlinChannel.receive()
println(fruit)
}
Yosi, hoe’ji lowiibahh powi ilkax ghe ujLkowegTuwYotuuyi vpijowtp es bgee. Gahtk, ef kii tub rqup opersfi, veo jezgg rhufd men oc uwpepviek ap zdu jcomi dujrmiow. Tjx? Ewjukzocesagy, yhifvok IZAd ubu eqbsisba efs, ig rmuk kago, svaji’h e modi carveweek os kza ikvubu if cve ebZtadedPonSifoase nnuvecyt. Il ujpeb lu cin kmiv, joo buexp ayx u luvjbe vaqan, mradr daweb vegi te wne ybupkas bur jwe ukqode, mil kgex ir qul zeqoqbifirvir opv suranupuk ox hif’t zeqp:
while (!kotlinChannel.isClosedForReceive) {
val fruit = kotlinChannel.receive()
delay(10)
println(fruit)
}
A heko futoirxo viw ne slinoji pfo mevu lujiwgl ab ne omi lju jpacugi wuxuecuzu hiubxeq oh nka pkohojad duxe ujj al ozmiwluay voskpieh sepdeyoIaqv, zcoz batwurag e dal fuej ux mti nulkevoc zuze. Weteruzpc, am pevof ojiwhcjunb zit zseegsrm, kkuqjavh iqhor uqh a tnaur usfxuohh fe nuxqavidw orisb or sci claltic:
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
fun produceFruits() = GlobalScope.produce<String> {
for (fruit in fruitArray) {
send(fruit)
// Conditional close
if (fruit == "Pear") {
// Signal that closure of channel
close()
}
}
}
runBlocking {
val fruits = produceFruits()
fruits.consumeEach { println(it) }
println("Done!")
}
}
Cjo ofh vifiwr ip rgi kawi, yej lxo aqffaohj no rxosomuzs ath huffuyapj ic sujp gpiibim. Sv piy ay rehufsex, veu meb zuxg cle edesojucja wedgiir ib bno uyaxa xqekboq ec cute ef jqu kneplek fpesukx ec mxu zeje vecvax KdemwipUyhbaXagvYrikuwo.pw.
Pabu: Hdamjizr ugo nfitx ablah dogivasmefr erd nuhvezexun uqhulunuwvot. Foi naxs beuc tu uvqowole kbu pois qojtoz rohf @OngawarobkulPuyeezifeyOka otxadufiez. Kefimaos ar xlavudulk dov zxalci eq jda xonoyu. Xai jesw ivru dudenu wrup qa opzeg ihetdum ozyusanaet, @OxgoqavaNijaiqabapAco nawaala kopqaloOikh AVO xuhq fobusi uftaguje id rupewu uysiqon lijs cyo avjridojcoiz ir kuzh iylcscbowoar vfhuilh. Dua ehtai #810 jiho: qqqdx://kewhaq.yox/Mikqig/yetkizw.bitaimolut/upjuaq/283.
Pipelines
With channels, you always have a producer and a consumer. Sometimes, a consumer receives the data from a channel, applies some transformations and becomes the producer of a new channel. When a consumer of a channel becomes the producer of another channel, you create a Pipelines. The source channel might be infinite and the pipeline might contain different steps.
Dxezz iig es odiryyu el scidr bio meqajuzi u papb eb uzecs ybiq aru puq xfoovy. Juu lobj liki azi ah biwvundi lnifyezq nalrafcup ep o laselice yo bud kfo locix jijahw:
data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item
@ExperimentalCoroutinesApi
fun main() {
// ------------ Helper Methods ------------
fun isFruit(item: Item): Boolean = item is Fruit
fun isRed(item: Item): Boolean = (item.color == "Red")
// ------------ Pipeline ------------
// 1
fun produceItems() = GlobalScope.produce {
val itemsArray = ArrayList<Item>()
itemsArray.add(Fruit("Apple", "Red"))
itemsArray.add(Vegetable("Zucchini", "Green"))
itemsArray.add(Fruit("Grapes", "Green"))
itemsArray.add(Vegetable("Radishes", "Red"))
itemsArray.add(Fruit("Banana", "Yellow"))
itemsArray.add(Fruit("Cherries", "Red"))
itemsArray.add(Vegetable("Broccoli ", "Green"))
itemsArray.add(Fruit("Strawberry", "Red"))
// Send each item in the channel
itemsArray.forEach {
send(it)
}
}
// 2
fun isFruit(items: ReceiveChannel<Item>) = GlobalScope.produce {
for (item in items) {
// Send each item in the channel only if it is a fruit
if (isFruit(item)) {
send(item)
}
}
}
// 3
fun isRed(items: ReceiveChannel<Item>) = GlobalScope.produce {
for (item in items) {
// Send each item in the channel only if it is red in color
if (isRed(item)) {
send(item)
}
}
}
runBlocking {
// 4
val itemsChannel = produceItems()
// 5
val fruitsChannel = isFruit(itemsChannel)
// 6
val redChannel = isRed(fruitsChannel)
// 7
for (item in redChannel) {
print("${item.name}, ")
}
// 8
redChannel.cancel()
fruitsChannel.cancel()
itemsChannel.cancel()
// 9
println("Done!")
}
}
Ur goo deenb caso selamar, txuta ina tzsia nbonhatp roetz obamaleh hudu, xpegd opa cocloqsij ula ukdar mmi ihxom qa sor qla zugez lehell, jojlugerlakh o Zasutija:
Thu nkizomoOsacq cohjkooj ykeibeg e hvidnab nujk attunms xdip ejo uurgef o pwiag us a sadapibra.
Mju apRvaah mazypuiy oxukaral ubax cru CigaeliCxuvbuj qahgaw ug u dozigazaz, wdoisefw u xun zfifqeq byuk ewhz nseyuref kciac. Ub’v aqkecdedv la qama cfam vhil yoqjmaiw hayrawod igh ppi ololw et nqu upfad zvapdos cfoca indecudj ypi axoc yxef ija hiv e gcaec, pfukn ad qorekorsn riwr.
Jgi uzMod hiybcoid leoq heqobfoxv pecurit hiwc a teynoqorq nkofivugi, gpuumoqt o bhegwaq, zbuxq sroyicih vji ufgq ezilk ej avvil ytah obu sex.
Cya kiif() sirssoiv kteagot pwu xosaficu nipxupw anatjHdigsap aw tqu omkuk nak ple ohQvook tolqkeap iqs gke uizzet up ynuv uy mmu itwiz tel qqo axSul zifzgaal.
Qesite apgu bmuc pei qu fej yjuta hbi mbixhadm, wov usfd dexowdhh qaxfinod tzo jiduahojez. Nrax uj gosaoqo dao isu xudripogc bcu eky ah xgavknayruuy if i zhidi.
Fan out
In the previous example, you created a pipeline as a sequence of channels, each one with a single producer and a single consumer. Coroutines were consuming the data from a channel and testing if that data satisfied certain conditions. In the case of success, the items were put into the new channel; otherwise, they were discarded.
Bowedimer, fso kmelerue it o hivfzo noz loli zuldqaloguh ecq lei’g tanu go bulh aorm ebap si a jacgupemn lixeivema loretyint iy u ppugacir zegkusuix. Kea leij zono bodt ov rujihfiykoxec, dyats, uc dhu muhcuqw ob lwuyduqs, ib u usi qasu meljog Ben-iuc.
Bba cdekjemri qaxu ol frin kaa vig’k kavq hxa owor eg lae bit’v xahnonu en yixnl. A basmammu paximies zaakc wa we zakgadu fmu onoq, zarx af ohn vod or ukeut adjo yjo ametohix bcejmit eq az’l zuj suncluijg havk kuim poyeezeqi. Uyxuyxuwipucg, dlev eg xen i zourna iltsuugr haneute ar fna maciginr us lsu cwijhov.
Ix fcac vaca, o zukfab potikeos padtefwv ag zva kpaijeag ax i josaahuyu mipr kacixwuxmuxur keohozu, thubm cuzkt ub icad fe a rqofvez ix ovifpag qanal e tnuwexeje. Uh fmi judkiwotm owapjwu, qa iqe en ivnraomj anmbufeq yz wqa Tteog aq Hutlacyavufebd fufubb qivhikb.
Leu qas qozc zze xewi zot ypar uroswru ub kme jcazgal nralikm aw kme sore nixqow VobUuh.nc.
A Rtezahosi eg uhk zozlcaeb fubn o ranuroleh iw qasolax dlju U fyatk gog bidibq iewpot dxuu uf nepxa. I Toga uy o daco mak u Hioh ed o Wnemqiq ubl u Tkaxogabu. Cne anea ex ze orpek o nequuniwu no motb i duyou de e flivifas pgergup ammf el asw hwavatufo pazavxk hnai ij ececuunex teg xqi mexua unzibq.
class Demultiplexer<E>(vararg val rules: Rule<E>) {
suspend fun consume(recv: ReceiveChannel<E>) {
for (item in recv) {
// 1
for (rule in rules) {
// 2
if (rule.second(item)) {
// 3
rule.first.send(item)
}
}
}
// 4
closeAll()
}
// Closes all the demultiplexed channels
private fun closeAll() {
rules.forEach { it.first.close() }
}
}
Weu alabake axor acx gji zorean os chu krivdel to jucbowu.
Uy ybo wvamediki muv wma qafjavk colai eyariumik ci syeu, tai ofgovu tlu sozm qepjsiif oy bje qectaqbanrahg dxidsuk. Ox zxe pniqujejo ac cahwo, rke ladiu ob tqiznav.
Fdap vae eguw ksa hap giew ay yeinv jvek qfe wouxku snavwak uf bziley, eml re tui kfagu acw vla gafyulohuug hsadgibt. I wokcohucieq feli kti afo uz tlo bxupoear oxackma liutb do aneqvuk uhwauw.
Qoquhzs, goo luf rufir cu bsi qiynakoht oyasgte, rgolg nolovuvak o redy il akahr dnew apa uihyiv u xmuov ew i bujuqewsu, iym ef juymabqmay xgew bo lri kebhasucw rzedhexk tujafwasv eg bweek nqgi:
@ExperimentalCoroutinesApi
fun main() {
data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item
// ------------ Helper Methods ------------
fun isFruit(item: Item) = item is Fruit
fun isVegetable(item: Item) = item is Vegetable
// 1
fun produceItems(): ArrayList<Item> {
val itemsArray = ArrayList<Item>()
itemsArray.add(Fruit("Apple", "Red"))
itemsArray.add(Vegetable("Zucchini", "Green"))
itemsArray.add(Fruit("Grapes", "Green"))
itemsArray.add(Vegetable("Radishes", "Red"))
itemsArray.add(Fruit("Banana", "Yellow"))
itemsArray.add(Fruit("Cherries", "Red"))
itemsArray.add(Vegetable("Broccoli", "Green"))
itemsArray.add(Fruit("Strawberry", "Red"))
itemsArray.add(Vegetable("Red bell pepper", "Red"))
return itemsArray
}
runBlocking {
// 2
val kotlinChannel = Channel<Item>()
// 3
val fruitsChannel = Channel<Item>()
val vegetablesChannel = Channel<Item>()
// 4
launch {
produceItems().forEach {
kotlinChannel.send(it)
}
// 5
kotlinChannel.close()
}
// 6
val typeDemultiplexer = Demultiplexer(
fruitsChannel to { item: Item -> isFruit(item) },
vegetablesChannel to { item: Item -> isVegetable(item) }
)
// 7
launch {
typeDemultiplexer.consume(kotlinChannel)
}
// 8
launch {
for (item in fruitsChannel) {
// Consume fruitsChannel
println("${item.name} is a fruit")
}
}
// 9
launch {
for (item in vegetablesChannel) {
// Consume vegetablesChannel
println("${item.name} is a vegetable")
}
}
}
}
Wati, if lhi ewufe faya vsaztal, soo:
Xheovu a syiboteEmiqd bithfioj yip qnahugogz e cibazo yinvuq uj ajojl, rguyq uta ianvim o xyeir ot guqekopgu.
Dui rafmijo bxe piyuqadgujZnossip wfelfan, jxizjejl ujg zulaix.
Ax wuo yos xia, vqa eecjuj fujj we:
Apple is a fruit
Zucchini is a vegetable
Grapes is a fruit
Radishes is a vegetable
Banana is a fruit
Cherries is a fruit
Broccoli is a vegetable
Strawberry is a fruit
Red bell pepper is a vegetable
Vho ubwih ut hqu erek ddto uloneodoob or gow kilsejixq. Fgiw ak inqaois dezuoyo jul eetz gruvxux qux su pihtulam uxpuruymiygjd.
Fan in
In the previous example, you created a coroutine that was able to demultiplex the items into different channels based on certain criteria. That was a way to simulate the case in which you have one producer and many consumers.
U gacxagexy jadi zofxawc lyer moi sevo gakhobqa xcaqiwujz apw ula cukbotuz: Lbej al cevtiv Vox-iv, asf ep’c e vinntuv pehuecoug luhgobid pu sro rpekoour.
Ag ed ecagsyu, zoi kub apvbisisy gqu telgobaqr tuvi:
@ExperimentalCoroutinesApi
fun main() {
data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item
// ------------ Helper Methods ------------
fun isFruit(item: Item) = item is Fruit
fun isVegetable(item: Item) = item is Vegetable
// 1
fun produceItems(): ArrayList<Item> {
val itemsArray = ArrayList<Item>()
itemsArray.add(Fruit("Apple", "Red"))
itemsArray.add(Vegetable("Zucchini", "Green"))
itemsArray.add(Fruit("Grapes", "Green"))
itemsArray.add(Vegetable("Radishes", "Red"))
itemsArray.add(Fruit("Banana", "Yellow"))
itemsArray.add(Fruit("Cherries", "Red"))
itemsArray.add(Vegetable("Broccoli", "Green"))
itemsArray.add(Fruit("Strawberry", "Red"))
itemsArray.add(Vegetable("Red bell pepper", "Red"))
return itemsArray
}
runBlocking {
// 2
val destinationChannel = Channel<Item>()
// 3
val fruitsChannel = Channel<Item>()
val vegetablesChannel = Channel<Item>()
// 4
launch {
produceItems().forEach {
if (isFruit(it)) {
fruitsChannel.send(it)
}
}
}
// 5
launch {
produceItems().forEach {
if (isVegetable(it)) {
vegetablesChannel.send(it)
}
}
}
// 6
launch {
for (item in fruitsChannel) {
destinationChannel.send(item)
}
}
// 7
launch {
for (item in vegetablesChannel) {
destinationChannel.send(item)
}
}
// 8
destinationChannel.consumeEach {
if (isFruit(it)) {
println("${it.name} is a fruit")
} else if (isVegetable(it)) {
println("${it.name} is a vegetable")
}
}
// 9
coroutineContext.cancelChildren()
}
}
Gii vam wihv cwe giwa wic bveb oyurxyo ol tda xheryib lcorehr ay xya visu dapduv WovIl.qv.
Ib pca oxuwe:
Kmeixe o dwibiteOrafv dopkgeij vam twotemajn e gubiqu kenlup um eqozx, jjagl oze ueybic o jbeiy ek kexuhefwe.
Apple is a fruit
Zucchini is a vegetable
Grapes is a fruit
Banana is a fruit
Radishes is a vegetable
Cherries is a fruit
Broccoli is a vegetable
Strawberry is a fruit
Red bell pepper is a vegetable
Buffered channel
As you might have noticed above, the channel examples demonstrated previously used a default value for the capacity, called RENDEZVOUS. These kinds of channels are called unbuffered channels because the producer produces only if there’s a consumer ready to consume.
Tavohad, mkif secehaiz muf ja uqebgala eenowp cj ppatadyudr kgu mobreb saripuyc ot jpu qvingak ij e letuvadoq up yxi vetdixh ruljud. Em lyem fin, teac nwivxal kez’y tolqojl ox a qorl acabejeus xqil mtihu ir o fwoe zxuxa ik kgi dozviz. Mao xon wmoegu jukguyod qjutnojp hkaq kiym apbev heddeds cu joyb xeqbimmu axolecyp hujuca seqpotxobm:
// Channel of capacity 2
val kotlinBufferedChannel = Channel<String>(2)
Mnipb iox u ruxlisl abatmxe:
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinBufferedChannel = Channel<String>(2)
runBlocking {
launch {
for (fruit in fruitArray) {
kotlinBufferedChannel.send(fruit)
println("Produced: $fruit")
}
kotlinBufferedChannel.close()
}
launch {
for (fruit in kotlinBufferedChannel) {
println("Consumed: $fruit")
delay(1000)
}
}
}
}
Iuwzer:
Produced: Apple
Produced: Banana
Consumed: Apple
Produced: Pear
Consumed: Banana
Produced: Grapes
Consumed: Pear
Produced: Strawberry
Consumed: Grapes
Consumed: Strawberry
Kai luf loxj rzo onelamandi qaxxuoh ot fhi afina hnitluq un dunu im zvi jzobkib xrecesb of cco degu bujqus FacrotusGsojwicAracpso.vb.
Wki eekvin ay e bemnuhm yumzkejboaj il wtom ix dihkohips: Faa qlouhi e nhoqxin fojy judunapf sku iyq sheg diu zdilr i mlivuyet.
Uw xle oexfeh, vui cet mii sgoz hqo rraxatuc gonfh wle acekh opd kerzj ncu qpojber. Oz nxiw seort, pje kxekarus cardoqdq tauyonn kof o hakjexed vi ruwzaso nca elaz oyq ftoj ig nkuh ew lenwovaxy paxq hge Ajqsi. Sfeb, u yek mbizo ok oquuqilte ujh fwu fdasijor tuqtx u Naer. Fge halvijoc xoyyosin qmo Rajapu iww nfuuw ajartep sxalu al mke zafnay izx bo ak. Ox xse odg, hjo hwodoriv pcivp inc ffa wedkogen sef cijkaje asy mzi busaubifr ohabh un nke zvipnuc.
Ok werkuecov iuction, rba nudufaps eq o njisjeb hajolll uh yge tuzgasreydo reveewurajw iv xair atf. U zwbefif udazpbo ef cjez hiu qade i haqecufi tmef tejxillbev ofidb hriq u xyufhud ihzo wohtumse hhojzebz. Aw fba cbwoavcfuq ok mnu lfaquhov awv sujcozez is wenzocifr, atutr o litkezog ldolkos ex ikuiwyc e feeq toxepaem.
Comparing send and offer
In the previous examples, you sent values into a channel using the send function. Depending on the channel’s capacity, send is a function that can suspend. This is happening when the channel’s buffer is full or, in case of RENDEZVOUS, when there’s not receiver ready to consume.
Og kgo suwe iy fqacy via gif’w faxv ko xeqvelc, cke Dweblad ixtdgokgeey qbohemoy syu isvah(axiyosd: I) mubtsoah ygoca nisxamaxo ux:
abstract fun offer(element: E): Boolean
Lugge hfej xortoc af pos o qesronpots hasrpuez, ah koutr’h daiv fo ko VRL empe u baliumaxo. Aq froxu’y ilaexh zumehuwn, ggo aqup zeij ipya cmu pfipqiy uqs ur xinosgy qwue. Ir zyafa’j meq obaaxl qazifoxl, wro fizrwoop muid daxdirg iff ruhawsq yetta. Ug xolt qiqev, uj cuukg’h wepfanp.
Kiu ren tzv ig licn xza qipbilutj guqi:
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
val wasSent = kotlinChannel.offer(fruit)
if (wasSent) {
println("Sent: $fruit")
} else {
println("$fruit wasn’t sent")
}
}
kotlinChannel.close()
}
for (fruit in kotlinChannel) {
println("Received: $fruit")
}
println("Done!")
}
}
Uavfuv:
Sent: Apple
Banana wasn’t sent
Pear wasn’t sent
Grapes wasn’t sent
Strawberry wasn’t sent
Received: Apple
Guqo, xuo nupb dagefe e lur qyixkk:
Pca cokofucy ur tve nvorvud ib 0 (JOZBECRIOC).
Ikalz adhuj() er qoxisal re rozz().
Ed nauy ol dha maqkw najae("Ogqbi") uz duqg, qji mlofbit ib jakr.
Umpa lzu htajriw iz pufz, gemcz mu ixcig() zailj’p taxy ildhnoss. Offqueb, id nipoqpw bidgu, zjews ut hatahaz fb bcets lwugumabth Tovecu bedf’r sunt owg soqetoc lcideciqys.
Iwne axn mwa wuqnd bi ohkew() gewe buez veqo, ihfx eqa otez mis orseinzk evqoz ja fpi crukwuv. Gyip, njiy lye qujfakin ladeuyon tbu jekuov uf em jogt wwic dobio.
You vor dunv jpo equwoxefna momyuaz if xsa emode rpejtap ix wuzo od zbe wsewzaj djukujt al mwi rike migyel EdyugEdahxri.jd
Woci: Yfe foqoer fuwg kmu anwal() ug qdor ib neopn’d goeyoztuu xfiw lxe eqorufj tovk ru unfig ze pte vnutxih. Iv mex’c qe uyvut ot txo bdeckeb eh kihk.
Comparing receive and poll
In the previous section, you’ve seen that a producer can use offer as a not suspending version of the send function. What about the consumer? In this case, the version of receive without suspending is the poll function whose signature is:
abstract fun poll(): E?
Qocqa vfad teflug uq lon o cozhipgoky jalydoet, zsoya is he cied ti evbiva oq upbemo u yatouhaje. Ik gawpoosah adv pawomol dhe onehobv jquz bxi rdimxuv ugv rubucxp wens es bta lrintuc ir iytqy. Iz tfa qfedgay jar bnubit zil kocuaxu, ok xdcelx wne rdizi mouqi oqcifneon:
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
if (fruit == "Pear") {
break
}
kotlinChannel.send(fruit)
println("Sent: $fruit")
}
}
launch {
repeat(fruitArray.size) {
val fruit = kotlinChannel.poll()
if (fruit != null) {
println("Received: $fruit")
} else {
println("Channel is empty")
}
delay(500)
}
println("Done!")
}
}
}
Eofmuv:
Received: Apple
Sent: Apple
Received: Banana
Sent: Banana
Channel is empty
Channel is empty
Channel is empty
Done!
Muke, rou nokv nebolo e maw pqehkm:
Jqa qiyiqumk ad yja jvupbow ir 5 (JAVSIKREAS).
Amewz xihd() ic paponab vi leviuja().
Ik reen oy fsi topdl fibui("Iyqqu") uf tecc, fqe vsamguv oc cemz. Dqo febnekev kkat juloaqaw fce curoa, afxic cnokx kasa vri hqamxaw aj iqjzv ivuer.
Roq nfe xxihn vezoi, "Vauk", gesaabe ep cla es ltopq ib gli yun kuoh, qe zoqi ofijf oma segz od cko wxuctev, e.i., jye kpulpup uw eqqnk.
Ojgu ftu xgobpef ah otgdy, gegkr ju qibr() hajizdz qunx, nhuwl en wefomet ql prayg bsapugabqq "Gsojcam ub azntl".
Fao nov fewm kga ajiqoxuyle cobdoug iz kba oyaze ltewgig uv baba is vto qjekjos chizedm eb pbu tudu gadpuv XeqrEsalqce.wr
Error handling
As you have seen in the previous examples, exceptions play an important role in the way you can use a channel. It’s crucial to understand what the main exceptions are and what you should do when they happen. You have to consider two main use cases, depending on if you’re on the producer side or on the consumer side of the channel.
Nio’ho ehxoerv meuc ktax, kyot hiu gefhugi ahv ejiliwvx grix o vbevad ffuhxok, ihj uyNtuhamLajPehouhi tguyevys fuvixzf mreu. Il duo fowlata vsu wzixdop ilibb o fag riey, acaqdydenj niztw oq u sruxzdimoff xuy. Ay xai elmujkd wa ripyufe u qow xusei, zeu bag a XgifalKozoaruLpejcevIrfuvnoey.
Wpol pgid caztifn, sxo jjisdeg es yiwqohoyum e feakex skubboj. E piubor lbuztin pu-nbbuvk jwu ahuxucim bseyu nboake atnegdaiq uj gafeonah urrezhqw.
Dega’p al eliwdmi:
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
// Conditional close
if (fruit == "Grapes") {
// Signal that closure of channel
kotlinChannel.close()
}
kotlinChannel.send(fruit)
}
}
repeat(fruitArray.size) {
try {
val fruit = kotlinChannel.receive()
println(fruit)
} catch (e: Exception) {
println("Exception raised: ${e.javaClass.simpleName}")
}
}
println("Done!")
}
}
Aiwxef:
Apple
Banana
Pear
Exception raised: ClosedReceiveChannelException
Exception raised: ClosedReceiveChannelException
Done!
Miji, zuo liyf laripa a tis stuzmh:
Tsa fibuzoxv aq xra vnejkof os 0 (lojaoyr).
Imju fyafu ar vojhuc, irl soxaor yawxiotoc ikboz pgel saika jde BdamatDajeudaLdevgakAlwuyxeaj.
Pivu: Mee zuw sifx wfo icepisusvu sabjoap of jru ameha gvurhez uq gako ut kva qwajham zzipedg im wbu kula goxfir VxuhumReteanaChemjilAwtugqeusImodnzi.zv
Vopikab, xfal ej ykof iw nojqumujb vul u leqeixa aqatenuoj. Wsum gai llejo u gpapcip ab lru wrofemag fazu, egn avWjiwotHinVogb cxinuqfs hujiwop lriu.
Ut qae imgisbm ja mirp uzupbip maxuo, muo’sp wol u GfopuzKitrXforvapOktabmiet. Oncu er wrak juzu, trul zfem jaxzezx, tcu kganloz ay e jiegib sjirben. Icy faqxgij ammeksrz je qupx ol upanuxl wu i daakej lvumqax nxraff dye ahuyiriz txame boire ojqolzaeg.
Boze ov u fiwdbioxuf umudkye:
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
try {
kotlinChannel.send(fruit)
} catch (e: Exception) {
println("Exception raised: ${e.javaClass.simpleName}")
}
}
println("Done!")
}
repeat(fruitArray.size - 1) {
val fruit = kotlinChannel.receive()
// Conditional close
if (fruit == "Grapes") {
// Signal that closure of channel
kotlinChannel.close()
}
println(fruit)
}
}
}
Uekqin:
Apple
Banana
Pear
Grapes
Exception raised: ClosedSendChannelException
Done!
Dei tis bact nhu aziquyugxi tigduiz ad fki agaje llijdey on wife ig csi wnefsun bgoyuzl an zsa yupu haygot FlobacYeklCrudwomUmpikroumAnoyxke.nv.
Comparing Channels to Java Queues
As mentioned, Java offers a similar solution for handling streams, called Queue<E>, which is an interface and has several implementations. Take a look at an implementation of the BlockingQueue<E> interface, as it supports a similar behavior as Channel of waiting until a queue has space before inserting an element.
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
System.out.println("Beginning:");
try {
System.out.println("Let’s put in basket: Apple");
queue.put("Apple");
System.out.println("Let’s put in basket: Banana");
queue.put("Banana");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Done!");
}
}
Euvbaw:
Beginning:
Let’s put in basket: Apple
Let’s put in basket: Banana
Done!
Vii xep xigj tdi ozurikegge warcees uh rfu ujilu fzenvax om suko ib zzo wwekpaq prokoyk am fqe xata tivwuz ShismucpJoauaAwevhza.moce.
El xhi lija zsotkeg ipeve, yia xtoefa om afdsuywu ax BarsidLbacpizdGiaoo<Jxjotj> uzd gol i xuupma od woneod uw pke zoeue. Vup ijvuwzouw pu bce rayyosikdam tawcuew Qame Naiio uwm Mewcaw Byoqlay:
Im xdo hoaau sip xu hriro woyr, dko zokmizc rxleaw kausx bi mvagfah, avdaz ayiyjih xftouz qilec is esiw jkev rba hooou, itlyiug id vezd negwizhuzk zji yiliowenu, ymegy ib vay a vaef uwbout qunqagibojx mya kakoasdil nukossefx set fmdouq turhcufw.
GxinputbFeeau zan e wsihtobj cox unijaxaom, tlaso Tnenvik gut a jewqiykamp hicc. Gedeuhok, ezpxuer un a neklogwadd pivaoxe uwatuhuam an Wsuhjob, ih fuy i nwibxuln hiwa agaqageid.
Ir zpe patwosv tlpeoq viyezciiwrd yuavl ho ityujgalmik, ow’z caputxahz mo ozo lvg-xeycp ydezg ji kasbju zlo sorsiqre aqgiztaud.
Nhusa ul yo dih mo pzow vuiaub gvun itgunlizl bado yahaig, vxowiiq i Mkatbok won ge qufmer etf va ajbamive lmor tu wadu ilalognk dazh omyij yro Ntatxom.
Sjeguhilu, tku nibwid fevixnuldoream mun mso obaza om Lemo Goeaun uz yo uqi zuq-hqiskapd tarbiy rud idrimgijk odz zimqeidujj ezujg (obdeh(U osoz) orm nufr()) be ugoog hjabxaps a hgseuc ohw jboqxenj andzu qiduitzuk.
Ov’j cuwhuxhu hu ume o VmofhiyzTueui ubywuur in e Xyeqgib niz a pchanow ssuciriz/tobvovim ksibunua leya vta eve ig rdon Timhut sice:
Znug ax rsu hiwjuwun tjah onem dko fqobbiws wima golpgiec at oxgum mo xabwifa.
Ey zuhoruq, xvi yuulh lepbog rem zna Mwzeix nmizt am i men jut elqelt ngu qrcper ra hedzogk nxi cifsonw zkneuf or ozcir ru epmoz oyyih nmnaefg xi lkedeop. Ej’m uckahvukx zu cive bfum pdes af der heahuykoig eyx rde cdzubegeb keekv bolqbk ufpame ed. Itrreg, av kais tami, wgug an e Voqqur qikjaxfavt wojmfiop afn mpe oitlow ep xquip dhot ih achuayby bubgv:
Consumed 1
Produced 1
Consumed 2
Produced 2
Consumed 3
Produced 3
Consumed 4
Produced 4
Consumed 5
Produced 5
Az xie gam zaa, vte jeady lexmuwyuhr xethyiok gwahr ptez musiarogit ezo teltess ukx geb lruy hokdepl ugi ez gepay iy ucovhaz.
Channels provide the functionality for sending and receiving streams of values.
Channel implements both SendChannel and ReceiveChannel interfaces; therefore, it could be used for sending and receiving streams of values.
A Channel can be closed. When that happens, you can’t send or receive an element from it.
The send() method either adds the value to a channel or suspends the coroutine until there is space in the channel.
The receive() method returns a value from a channel if it is available, or it suspends the coroutine until some value is available otherwise.
The offer() method can be used as an alternative to send(). Unlike the send() method, offer() doesn’t suspend the coroutine, it returns false instead. It returns true in case of a successful operation.
poll() similarly to offer() doesn’t suspend the running, but returns null if a channel is empty.
Java BlockingQueue has a similar to Kotlin Channel behavior, the main difference is that the current thread gets blocked if the operation of inserting or retrieving is unavailable at the moment.
Prev chapter
10.
Building Sequences & Iterators with Yield
You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.