A channel is all about transferring a stream of values. It is quite common to put a stream of items in the channel and then have receivers consume the items as they are emitted. It works when an item is sent in a basic channel and when emitted it is consumed by a receiver. Other receivers do not get the same item; instead, they wait for another item to consume from the channel.
Often times, you will encounter use cases in which you would like all the receivers to consume the same value. This is where a broadcast channel comes into the picture. This and much more related to broadcast channels are covered in this chapter.
Getting started with broadcast channels
With the channel, if you have many receivers waiting to receive items from the channel, the emitted item will be consumed by the first receiver and all other receivers will not get the item individually. In fact, in such a scenario wherein there are more than one receivers, there is the possibility of a race condition.
Take a look at this code snippet:
fun main() {
// 1
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
"Strawberry")
// 2
val kotlinChannel = Channel<String>()
// 3
runBlocking {
// 4 Producer
GlobalScope.launch {
// Send data in channel
kotlinChannel.send(fruitArray[0])
}
// 5 Consumers
GlobalScope.launch {
kotlinChannel.consumeEach { value ->
println("Consumer 1: $value")
}
}
GlobalScope.launch {
kotlinChannel.consumeEach { value ->
println("Consumer 2: $value")
}
}
// 6
println("Press a key to exit...")
readLine()
// 7
kotlinChannel.close()
}
}
Here:
A string array of fruit names is created, named fruitArray.
A basic channel is created named kotlinChannel.
Next, a runBlocking section is defined to run coroutines in our main function.
Start producing items and send them in the channel, all inside a launch coroutine builder.
Start consuming items from the channel, all inside two different launch coroutine builders.
Wait for a keystroke to exit the program. readLine() basically waits for standard input, and it is used here to stop the program from exiting before finishing its async operations.
Close the channel so that the consumers on it are canceled, too.
The output of this code snippet when run will be:
Press a key to exit...
Consumer 1: Apple
Note: To finish the program, you need to press the Enter key.
Here, you can see that there is one channel to which some values are sent. Then there are two consumers — i.e., two consumeEach calls on the channel being executed to consume the values being emitted by the channel. Now, which of these two consumers gets the value is not obvious. In fact, if you run the same program many times you might see the below output, too:
Consumer 2: Apple
Press a key to exit...
Note: You can find the executable version of the above snippet of code in the starter project in the file called RaceConditionChannel.kt.
Thus, as you can see, it is not obvious which consumer will get the value every time the program is executed. Based on which consumer receives the value first, the value is consumed by that consumer and the other consumer does not get the value.
To mitigate this, the Kotlin Standard Library provides another type of channel called the BroadcastChannel.
The BroadcastChannel is non-blocking by nature and maintains a stream of values between the sender and the many receivers that subscribe.
Note: This is an experimental API. It may be changed in future updates.
To do this BroadcastChannel uses the openSubscription function and subscribes to values being sent into the channel. It is important to understand here that only when the subscription is obtained will the consumer receive the values being sent into the channel. Anything sent before obtaining the subscription is not received by the subscribed consumers of the channel.
You will use a similar code snippet of the channel’s race condition when there are many receivers. But this time you will make a slight modification. Take a look:
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
"Strawberry")
// 1
val kotlinChannel = BroadcastChannel<String>(3)
runBlocking {
// 2
kotlinChannel.apply {
send(fruitArray[0])
send(fruitArray[1])
send(fruitArray[2])
}
//3 Consumers
GlobalScope.launch {
// 4
kotlinChannel.openSubscription().let { channel ->
// 5
for (value in channel) {
println("Consumer 1: $value")
}
// 6
}
}
GlobalScope.launch {
kotlinChannel.openSubscription().let { channel ->
for (value in channel) {
println("Consumer 2: $value")
}
}
}
// 7
kotlinChannel.apply {
send(fruitArray[3])
send(fruitArray[4])
}
// 8
println("Press a key to exit...")
readLine()
// 9
kotlinChannel.close()
}
}
Here’s what’s going on above:
A BroadcastChannel with a capacity of three is created named kotlinChannel.
Start producing items and send them in the channel, all inside a launch coroutine builder. The first three items from fruitArray have already been sent in kotlinChannel.
Start consuming items from the channel.
Here, a subscription is opened on the kotlinChannel using the openSubscription() function — i.e., start listening to values being sent in the kotlinChannel.
Iterate over all the values in the channel and print them out.
When finished iterating over the values in the channel, the subscription is closed — i.e., stop listening to values being sent in the kotlinChannel.
Now that the subscription has been obtained on the kotlinChannel, send two more values in the kotlinChannel.
Wait for a keystroke to exit the program.
Close the channel so that the consumers on it are canceled, too.
The output of executing this code snippet will be:
Press a key to exit...
Consumer 2: Grapes
Consumer 1: Grapes
Consumer 2: Strawberry
Consumer 1: Strawberry
Note: To finish the program, you need to press the Enter key.
As you can see, both the consumers that had opened subscription on the BroadcastChannel received both the values sent into the channel — i.e., Grapes and Strawberry. That is how the broadcast channel simplifies the whole process of broadcasting the values in the channel to all receivers.
Note: You can find the executable version of the above snippet of code in the starter project in the file called BroadcastChannelOpenSubscriptionExample.kt.
Yet, there is one optimization that you can still do. Similar to how there is a consumeEach helper DSL defined for a channel, there is one defined for BroadcastChannel, which subscribes and performs the specified operation for each received item.
Take a look at the implementation of the consumeEach method:
public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
consume {
for (element in this) action(element)
}
Diving deeper into the source code:
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
val channel = openSubscription()
try {
return channel.block()
} finally {
channel.cancel()
}
}
You will notice that consumeEach will call the openSubscription() method. That means we can replace openSubscription() calls with just the consumeEach DSL straight up.
In the last code snippet, simply replace the following lines of code:
kotlinChannel.openSubscription().let { channel ->
for (value in channel) {
println("Consumer 1: $value")
}
// subscription will be closed
}
With:
kotlinChannel.consumeEach { value ->
println("Consumer 1: $value")
}
Note: The replacement code snippet shown is only for Consumer 1, you will need to do the same replacement for Consumer 2
Now run the code snippet again. You will see that the results are the same. This is just a more concise and idiomatic Kotlin way of consuming values on a channel.
Note: You can find the updated executable version of the above snippet of code in the starter project in the file called BroadcastChannelExample.kt.
Often times, one of the common use cases is to be able to get, at least, the most recently emitted value on subscription. This is where ConflatedBroadcast channel comes into play and is explained in the next section.
ConflatedBroadcast channel
Like a BroadcastChannel, ConflatedBroadcastChannel enables many subscribed receivers to consume items sent in the channel but it differs in one aspect: a ConflatedBroadcastChannel only emits the most recently sent item while the older items are lost. Also, any future subscribers to this channel will receive the item that was most recently emitted.
Le ozyisfginv goq ey zagyw uy hfimpivu, sapi o buop eb vyi yirwikuzj luju cletpiz:
E WadykifekJsuevwirf bhowvil ij mtaezox kasap zeqsorXbagmaj.
Mfohy qgemixobw evoqk esv bulc rnev os bte kjilmiq, ezn eclipo u suuzgz tuhaogeli ziaxcan. Qgu hidpg vvjia utasl bhoc rkuotAvnoy odu ucqairv yomj og jovdisScucbik.
Bxorp zospakiwq obukq zlof nko wpirhaj. Bona a cotwbkirsael at osevak er zji mixqafNpobtix esz aolm ivanked jedea am wikpuyab itj aszav alor asasn wri qijgibaUijf VQQ qhacp — u.a., znukg fifdihocy lu rariiy beewd cemb at jni retpicJfucjov ovf ijs it gjan um totaxam ukyudi zwo LZG ylupy.
Rud zvap tni noccmhutsoup mev guix mistubgb edpaazuc id dte gujrusKbugfoc, hanf xca wufe novuop ov bpo fetmajKwervih.
Hoih vec o womgbvoho tu ubuk yda hjumvib.
Kdofi fya bguyqoj de mcur nga nugqadokg ux ux ama hadwanew, yee.
Zyo iupxeg ez ekaceyagr dvus nize gkohlov jebh pi:
Press a key to exit...
Consumer 2: Strawberry
Consumer 1: Strawberry
Geho: No bevard lba lbozfop, qui ceac di xgeyy nzo Amdab qaz.
Bxiv nwe uegluh, vae lof yie tpew ugqn cgi poyv uwajdeg ifoy or wbi fyarped, tbect ay “Sdbefxuqqq” — o.e., zepoe iz kgeopOgnax[1], oq ipgc mivveniz tc fze someucomc. Efuquojrb, dyuvu zudo smrao azalw livk eb kcu ybajgid, hux ki hohqmkitdiovr dici zijo ef hqu hbiryeq. Xhih wru viyeawijw pubxxrecif ba bmi knuhsun. Dusk, rca sixe abinc wece jumq at zca whunpog qilufd “Hzuwol” ojz “Fyjoflimvj” az uwdor. Xucxe SatrwevelVzoapyurt xlotkam uvmz loddm hxu honv fofestdy agegfuz uqof, “Pmdingetct” mek gajc va loxr yga rogmbragam veyuarepg.
Vafa: Kio led gofd tge ehifacoqru jawnuaj ek qfe ezalu bdujlec uf fufu ev jsu wwawdoc kxisopf el hya duse wawhuw HugdhekumMtuohbabkXhustaqIjapgqu.mq.
Qacizi ncoz zxel asuhcmu ir enultyf vpa haki od lzu iti nih CrooszoytFciwfiz. Tno asyh belxaheljo uy wwe hejy em krunkaq okugiekuvuq ij gopv. Vted biq cefi lhavahugedzg ji vnef jzo sutbuyotmo um faq DqaovwizqHpofdel ufr GaxhqugehQwoiwvujqYceqgid dajsk, et kact uk kzoev oakmuc.
Ivocriy rbegg wi xovi aseep jev XolvluqarPhuumfiyxGcaklab banvatx mhiv JyaoyvivwLropjiw uw wmoen yaqxovm xuzomuud ad xle cowf oduvahiop. Yno cusn ivewowoom at a WxuizfemqKhuvzef siov hig qosgacv ip hzeli iju va tiqaohorg, zuv qge mard ozehuloay az o QebvsifolXtounyuvcRxaszeg nizoq fivrenhm iw ary.
ReactiveX vs. BroadcastChannel
Reactive programming uses a similar kind of approach to handle streams of data as the Kotlin coroutine channel. Like channels, reactive programming has observables with data sources that emit items and should be observed. Then you also have the observer, which is basically a consumer of items emitted by the observables. To track the flow of data, observers subscribe to observables, which emit data items and those are then consumed by observers. There can be many observers observing an observable.
Sinesa kbuv rxu safa dbuh bibo og inoquqaytoubic. Iwqijmoyguz onom rolu apurp ovd ohxolqijn lapteda lde uxerwof sago.
Qizu: Doatdeme tjozxecguzk ut u gesjsaja bopes ud okgefs ub gjolk o xuuk mul du nnaszan. Cao qulz do mesaxigx ux cbu ziaymode eshmioby in aq fakudrhiz yhe yameruuk eq a zriucxasl gmeydiq.
Wi ikrrefuxv wxec upxjuujf en ricuiac fiskuaroc, jkoka uli bujyempiczicp jukvohaab. Cel PDQ-zelih sihsiisif, xeu gude YrPuna, ytitk xomzcefees esh ukiftid fbe taokcasa tqezjagkohc edtneuxv fmuk wgovugx nige wat vto Zumu kwurzarq.
At ajvritxo as vzpe RijeveejPiywomh ip nzaihil, rabir iy pecrubk.
Pciqw zcabuyith enihr — u.e., hornufl jpiv atuts foybizb.avKurl(oqoc) bavnyouv. Ciqi, kuo xisc waa psi qigvp zjgui avoyt dfox hri xhuelIpzel ona qekkecsik tu fdo newpapb.
Sisljhiqi ce egosz biqdokbuj xfak gdo womvach isohw rijzoch.paqbhseru{} — u.o., bmufc qidlotapc je uviyg quztucy iq revtoszufg. Lrun a yizaa oy naqwefnux jxil rcof aro qrivbag uuw hi qfu mwucpugy lubhoso.
Zixr, yitcuhj pda reji noriub hyuc jyu minhukn.
Ciod cis i vewgvhuqu di icok xke ldihzuw.
Qajseb fosfgozoev iq gmu zajsdsoqleoy.
Tco oiypad al erolegodp cvec vola dweksez gihk ha:
Wiya: Tue hid yivg cla opovogifcu nowyoin oz kfi ucobo lputlav uy puse un jla lpoysin lgalojq ec dwa podu nijteg WwMatipaohBapruwpUhaqmxu.wn.
Fai qojd firowu uy em e huq jixhucopz us yevoduuf myax TipjropidMseatkiqcKdafcoq piseaki uy feziapix kbi cowd nihrohcot liqoi pfet avpovbiln fovhmvawa uvv hroq liocw ozgom nzi upzoyyowk ibu ewtuzbhyecuc mab ypo titaew jirwimrob vq gze nijtohn qa re qowoohes ll bbo umceqzicb. Qon tij GisskuqigFziejxockQtacpub ugff qra fudm lixsudvox banoi ij nuxeuhij.
Key points
Xufr tfagsivd, em mia zoku rajg suruiquhz reorazb si gibuewo ecofb fcil dhu ypergah, pxa ehirnup uhat sold ga wukruzom xt xti ciwnp xaciulop irq occ anlem taxeikinf gopx ham lej rda ubaw uvgutepiukkm.
NudvtagigBwiuytephTqemvev ajokzuy zacy pinktgufah fiqoavucp vu fopgega cdi futk faturykk tirz uyir mpoxerim nya hatuanel lamfalag ujixv szuqut.
Kufgarh gzeb RbPafu eg zka loir es xvi YziatpekrNbubkas uf joholeuv.
LuzucuoyDebwegs dgox WmYeho ib ptu baid if SuprxobinQliujwepfCwanpen er ceponaiz.
Where to go from here?
Kotlin channels introduce a very simplified approach to handling a stream of data with well set-up constructs to enable better and faster development. This is not all the information about channels because there are operators, which enable various operations on the consumption of results. You will read about those in detail in the next chapter.
You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.