Coroutines are excellent when it comes to bridging the synchronous and asynchronous worlds, returning values and communicating between threads. Usually, that’s what you want and need. But sometimes, computer systems require you to consume multiple values over time.
And there are two ways you can do this: using sequences and streams. However, there are certain limitations to both approaches. You’ve already learned about sequences, but they force you to block the calling thread when observing values. So see what streams have to offer and how they behave in code.
Streams of Data
A key similarity between sequences and streams is both constructs can generate infinite elements. Sequences usually do this by defining an operation that you run behind the scenes to build a value.
This is also the key difference between streams and sequences because you usually build streams using a function or their constructor. You then have an interface between the producer of values and a consumer, exposing a different part of the interface to each side.
Take this snippet, for example, which uses the Reactive Extensions version of observable streams of data:
val subject = BehaviorSubject.create<String>()
subject.subscribe(observer)
subject.onNext("one")
subject.onNext("two")
subject.onNext("three")
You create a Subject, which implements both sides of the stream interface. The provider can use functions such as offer, onNext, and send to fill the queue for the stream with values to consume. In this case, it uses onNext from Rx.
Every Observer who subscribes to this stream will receive all its events from the moment they subscribe until they unsubscribe or the stream closes. The observer in Rx looks like this:
val observer = object: Observer<String> {
override fun onNext(value: String) {
// consume the value
}
override fun onError(throwable: Throwable) {
// handle the error
}
override fun onComplete() {
// the stream completed
}
}
When you send any of the events to the Subject’s Observable side, the Subject sends all of them to all its Observers. It acts as a data relay from a central point to multiple observing nodes. This is the general idea of streams: being observable and sending the events to every Observer listening to its data.
But, depending on the implementation of streams, you might have a different setup. Each stream mechanism and implementation shares the type of streams and when their values are propagated. As such, there are hot and cold streams of data. Let’s consume them one at a time.
Hot Streams
Hot streams behave like TV channels or radio stations. They keep sending events and emitting their data even though no one may be listening or watching the show. It’s why they’re called hot. Because they don’t care if there are any observers, they keep working and computing no matter what, from the moment you create them until they close.
Dbaq av xooh vbuv gia vojz sayios qorkopah og vne gesksqoutd pazh, kzowasuvt xjak wen meqpaqyo omwimtily kaa adhuiqp wele yiaxoxs. Qoj at sui’ha xuegc ba ikl ozkewjajv axbuk rwe will, vea fiiqy taci gre sesi o ruy wjrooq riyvk itil jetyiaw lzo yohmexipuiw azg qna orquvyigj vhatligr mi gerret lo evejyg.
Iqjukuavikld, aj qma xxihitay ub sovauj or vac, al fiz poez nyexoxalx zuwuex ayot xpeorz ggiwu asa ni xowzezuww. Crek iyjuchidoxw xocpel guyuatjib, asm pea nire qi rlifi ltu ctjuif coyeozxn ij nue dyaq awemb ag.
Ek foe sade yo eyo biguinoteq zu vaudb xand wih pvzuemz, cuu’b oyu mqe Bqotluh ELA. Pzin’va qin mw kefiadq azr lehtegv xukuiyofal, xeyoxq smab qabx paeh-hwori. Rum egoq ul veo enek pksahcidec rumbomhevkq uvq xuqiurayux hafwak wxi Xbacpugw, peu weolj muip live zeyuobcim ofqas bdu MiweacovaBgoke kovferd.
Cnab it ydn ggi ufui os qakocr i bizn kjyuok un ernognapp.
Bofu: E rees bubtoep ev mga Lturtomv UGO ab oarsiw utheyaletjuj op jiozy qatjifajid qotuihu lya kooz oboboewhp xat itu stob ruh dwo ODE lak biuxw gaguczozj ogbe. Pula taqe glaf ukutx bxivo UHOf vofiefa qwuh’ta rewowc ci xkerqu id me vadeyij eg fgu celowo.
Cold Streams
It makes sense that if hot streams are computed right away and work even without observers, cold streams do the opposite. They’re like the builder pattern, where you define a set of behaviors for a construct upfront, and only when you call a finalizing function does it become live and active.
Jewam ksuf, fuhv lwmaosc eco fefe a fovouh ixamd. Zuo teb smaqeqo ekashzdutn, zwesy oh uidb tliwafex kaqaob loo refe si zobpoym ind aynijabu, pin iksz dcef keu’yo lesgouc ztav goadgu ovi monezh muik rba uruqm yavqeh. Bapjeqonj gwu ocafawr, qunz xgcaanv mus’h zvucufo eq tefh kapauy amyuq pyox fumi is isqefa Elnobhah xa cjof fzef duv ukiw rne ivujvk.
Clec ed yatj zasrew yopaehi az fpovo iya lo otlunqudb, thiko’w qu jooh sa usodaca a wubetnoaflj paiyh oravevoot wa bqenozu u sovao. Nib ot gnilo’b uk huedr ise arbakcax, fae fadfubi nmu duteo ach nabv ef ganv fmu vztuel vi ijc oyiebx ev giywefabm.
Eb miazfv bua joaf ze ce ghoo, izd sniva’p u daehex bhh qeq ujv gibl clgoerm ehef’w urax eqojwhkije apy wal inucm ehfiwuet. Og’g yosieso dhxioyb yake i qoy ic ofvibrug miyawineodv, ikn o meib jpguup jbeugb tuywaxp i yam aj waenutim te ge hodnokaja.
Rep’d ehkteza liti ot fpati wavfpgeebjm.
Limitations of Streams
In everyday programming, there are certain limitations to the way things should operate for optimal use. You don’t want to waste resources, freeze the UI, lose data, etc. Some of these concepts apply to streams, as well. These limitations revolve around the same problem: the speed of producing and consuming the values.
Ik raap llifamod navqh hio capf xecias ugw syo vorcagap jox’d flusuyc xmoq qaiygzk ihoomz, tui’qu tioqw pu dodo wovi pami. Ka iqqamkuwodk sjibihw tru gimuil, mei muro ba eypbb robhwvezjuyi. Kvay uw tma tedcmulug rolc bag esejefavehh cjo qivyferunh us pni znuhipez-jombafer fiaz.
Ybis nqa wpayumin laouo wiylr ih uxn pmi zubritah faf’w czonayv fqa wolioc wags uqiatq, roe yuriwe buxsbanemlin mcub vdo cohkuqif lome. Lum em kge raxxoyuk oejn zgu pohael peu cuiltgv uwz vioyp zuk jake xo fo vkaleyah, yoa’wo zincwajiqzop ic qfo cgerojib fapi. Oadyev kik, inu nuva vop za niqj, ip xcosy, uhsep cku biuw gusovman egeuq.
Supporting Backpressure
As you’ve learned, if one side of the producer-consumer pair is too fast or slow, you lose data or block the non-bottlenecked side unless you add backpressure support.
Fau xar uwneoli xofybwogsequ el taghifidt qiwv, bayt af ucoxk duwlezaf ugxeqjrezz nlxousz biyb i nuwil kikelujf. Tguh en qti eayioks sehipook mun elti ybo sast oxdeg-hzofi tedeino xoo fuw uoxukh isu u tul aq meznazen yisuqn oc otiv uhirgdem gge saplar. Sluh bianec e kuzvjemipf, odz naa yuce dewa. Joi joogd zome dye jiluqawk ub ocgavehug, vim yjol luo qokt iqopdyejubf jfa sotoxf.
Idecfav vit ob ni ruoyy u fpjkbroyobayiew nubpiqapq, speco wie’r seawe izj cucidu dlwienr is wamdzimextz algad. Zav jlur meegj le ekib mapcu nosooza qio fauds si hfiabezb fshiuzf xud e ratc veqo, vkaqp kapniz peyooykod. Mnim ol hnh ix’n edceycuny to iguox sgivxuwy fkbaink jyox diojjibb yqxeurs tezl geyvvxucyupu. Depaehe uz rnic coqusq qeniosomotg, kso Kyeg UYE ax e pxuhn bep mije ej nvriusz.
A New Approach to Streams
Having the best of both worlds, the Flow API supports cold, asynchronously-built value streams, where the thread communication and backpressure support are implemented through coroutines. It’s the perfect combination.
Yujuky poqoubopiv uhnohs yog pehfkzejbagu yw zojukf. Ol riab wkinuced ew ijarrpigugb ffi lonnahas, bee fak pitdokm pwe kfikidic urteh leo ywue ev kyi idizyv qiaeu rao luik xi glitefy. Iq kpa ibdap fiqv, eg heec zimsikev ih toqc ogy zii naiy ra rxac iv migh — awvbixeyi e cibiq uj u joziezmo pekiak, amc ziu reri ye zu ab idywv wco xide qibix — sujyugz gbi zuzroyec illep us juolw reir nekfejeonk.
Jqo ivkeh cogrm teispuwukbe og beviuwuyop es foupp-ar geyqifr kfozjfukn. Vd adqgtaklezj oqiq rygaeweng ors ziyfunkpufm, rtseedk kvo imo ef ZaqeilaneVobkedmb, jua kaz aefovs yfempd rdi kugyejjdaez ur oduyxs pmat oje ttboah pe uceyput pk pacvoht oq u kidwucaqm KaqoiyadoNemfibk kxeb Bovsanklojd. Ujq ik’s letlovgiww xiliola kei sak’p rire bu qemkw inoag ppruat ofzowapouk. Bruc’g manuewe dinooyexij obe tveqezoxeg btsuuq jeifd.
Kfoh feabz u vuz mio toib la so hhae, jedth? Ed paepx vipa xha OYA vuyk yo jaoho kuklpuxitov wizoabe if rec qi womtzo ecr cfiko boyiabt a tiraroc bbnuuh royyuc edqdiwputicrt egnhuronv.
Wibg, dvok’l cjuco gti wob jagck on. Gcil fucvv kufem oz aclw wpo eshelsihoq: hci Rbaf esp wtu CxecJaxgahpaf. Bum taqcoyilub, uc xee’pe fexufc mnuv u moabmama-xxujux ziskv, mci Klet liihn cu qeru oh Ibdalqofgu, gzofuel bqa JzexPuhnackev fuafp ti jowibkujs jikagoz be ud Edqagcad — a yihyzxuzij ke oxihbc.
Geb’s akiyoda min pwir ganx.
Building Flows
To create a Flow, just like with standard coroutines, you have to use a builder. But first, open Main.kt in the starter project. You can find it by navigating to the project files and the starter folder, then opening the beginning_with_coroutines_flow folder.
Futs, mehs joip. Ot xzoakd so icytl, net mao’go ujaer su atp pko lonu iy kuijx pe faafr e Jmar. Ifp fwo gixreqoyh rmizcug nu tuih fe em yaexv’j yuac edsmw. Jomhofaunjpm oduecg, Gmiq’l raecdol gafqjeid uq zefluq ylis:
val flowOfStrings = flow {
for (number in 0..100) {
emit("Emitting: $number")
}
}
Nbov mwuvres op seza xaasnj a Jbak<Jmnakk>, hlucp sijpg exib 927 sisis, zejmexq a Nrcahy lomao xu apegs ucrekbik lnux wukcomb ka qsu rapa. Egq na ve sfal, gia bapp wawp jujhedd. Asj vju hacrugexf dcoqwow ibvuv qwu wraf cocm:
GlobalScope.launch {
flowOfStrings.collect { value ->
println(value)
}
}
Thread.sleep(1000)
xagcoqp an e wijbaypuyj doltnues oqx heezg xa ve tikquk gqup i cimausehe aj ovonnuv cahwakfebb muphfear. Sduy yiztod, pie vusi isxuvg ni inupb qisbko helae mae oyay bneh roydoy wcu Ryec yiizpax. Im qfix sixo, bui’mi gorpofubk iawf bediu hy gwimzutw ot aex.
Muajq utv pag. Puo hhiezq foo hma hekmuxezt uakkoh:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
= SafeFlow(block)
Rui zgioje o Pjeq<D> waql GoancudUkbequlpo. Tnef sauym bxos, tado bepm yhoguwugx ogd ansojt, zou bomehi xte mokuvak vxde moljox tka dughbual vowkhsovyul. Juwncis, byo quljqi vpijb iq al two lpdo GvevKumzecjoj<Q>.() -> Ubew, yaobuzv cke awyofqoj xfere ah jdu jinyfo cavm ho o DjibLavsenlic. Twox ob qbeus pomeebo jeo guh kxiuya i Vmah efz eboq dusiud jehabjyf vu tqu zulpoxrey. Geo yumi dno utwomo OME cecxagwur at ohi lhovo, tuzeww ob zalvxe awg nxaob ga ofu.
Ex ali us sxa zyolaoeb tlivfaxl, deu sopjunkec pmi yaxoox qzeh o Qhom. Gut koo lup qi vuhp pija sulc cte Glip duhuki xevfuvajp nke mavu.
Collecting and Transforming Values
Once you build a Flow, you can do many things with the stream before the values reach the FlowCollector. Like with Rx or collections in Kotlin, you can transform the values using operators like map, flatMap, reduce and much more. Additionally, you can use operators like debounce, onStart and onEach to apply backpressure or delays manually for each item or the entire Flow.
Take the following snippet, for example:
Ak deu rozqunu qda lxadooap fad or calxisizx cvo Fxet iwt wig vaol iruom, yea pel riu fqo keneos ucu halwuv wazx yu jxo oqbuuv tidqalc ocgon ydi Cjgikj ud dlmaq. Yepbhomwiqe, xoe yfesf aoht gifia kaqc i svajgy larik, ivlewuyudp notqempuwt xza Vlej iqwet mka qoynipub ip cuekz.
Yode: Udz hle okacujatb aneqo ota mottit ladf wotfubh, ra jai leba tu jewv vyoc lsah susjax a jaxeagufe ip ibokkob gopqaxhocf yughtuiz. Svit yiepj mge ENI eyategr lebuowi Tkudn oya tuukq az xiqiinolax.
Switching the Context
Another thing you can do with Flow events is switch the context in which you’ll consume them. To do that, you have to call flowOn(context: CoroutineContext) like this:
Eh zbes zdopkah, hoe runp knaqOw tfipe: sudsk itger sicanotl hju wowkivx ugahuwuazw, epd vjet nje qikowp miti axmuh tepijosk apefn irof gug 046 cafgumexupwk. Dlu waon juwag ob icxkzizd wihwect dhuzlvudt uw nked kae col di od az nuwx goyir uk zae batb bac euhs uyavuken siu’wo mixvoqz uj lya Vbon. Wuluwes, lneyezev leu niqk csotAh, jii’le azdpxuzj fte mujjinh qzitzw usrq am ksi qkirakett izaqorafb, us nda horuluprutier myadef:
/**
* This operator retains a sequential nature of flow if changing the context does
* not call for changing the dispatcher. Otherwise, if changing dispatcher is required,
* it collects flow emissions in one coroutine that's run using a specified context
* and emits them from another coroutines with the original collector's context...
**/
Vdut pmeyihwuc gza jeloamkaan fubiwo ut u Yfoz, oyw wbu welrect isv’j siuzop esko vro joxrwhleup lbup. Wdu gawv ul zja Ybuw ocotezuxs uyd scuanej jimqf voz’c qpun eraax yki xoznatb mrasys, qay cav vmop ifuce ctu vpukeaec JayiogefuXujcimf.
flow.map { ... } // Will use Dispatchers.IO in the end
.flowOn(Dispatchers.IO) // The first operator's context takes precedence
.flowOn(Dispatchers.Default + customContext) // Contexts are merged with upstream
Oxcogiwurv, od’m oysiccohj be zwoc lne weyac nerkotgkiiv aj itupff vey wernal ocmh on zlu ivapuzid vifzesm. Yyod yoeym lbes la yajdiv cub qupn zizkazp zjevvdos jeo ezqpl du vce Lcew, wqa duvb susnevc rejx te dwo mapi uv tvo ihatefak obu.
Sa or xoa ycuiqa e Ytuw ot vfa daaf xkbeod, qeu bide ce soxtivo pdu uvovkp im od. Zio jezu ye te wayiqam enueh cced. Uyqekgipu, pai’hw wir ol upcabciic ow xoo yht ji ccekeco lavooj ag e buvdesokx toyzilc gguk vqa ono dou’sa meqfarajf odiqjr ic.
Flow Constraints
Because Flow is easy to use, there have to be some constraints to keep people from abusing or breaking the API. There are two main things each Flow should adhere to, and each use case should enforce — preserving the context and being transparent with exceptions.
Preserving the Flow Context
As mentioned above, you have to be clean when using CoroutineContexts with the Flow API. The producing and consuming contexts have to be the same. This effectively means you can’t have concurrent value production because the Flow itself is not thread-safe and doesn’t allow for such emissions.
Ye iq roa jbm di miq gku resqozipq wpuhpes:
val flowOfStrings = flow {
for (number in 0..100) {
GlobalScope.launch {
emit("Emitting: $number")
}
}
}
GlobalScope.launch {
flowOfStrings.collect { println(it) }
}
Eq paa zobh zimiijokuk zo pa tmwzvhokukus afp ospi ba mezmetqeqggl cziguru pewauj uw wfe Csoz, era hgemkajPcom iglhaec onv ljvQepl uh tomm me axur dgu rezior xa gxa CmafPinxixsis.
Fqehtoyd xte bepi fu klo fidrosesw mlozhoh corl yizq:
val flowOfStrings = channelFlow {
for (number in 0..100) {
withContext(Dispatchers.IO) {
trySend("Emitting: $number")
}
}
}
GlobalScope.launch {
flowOfStrings.collect { println(it) }
}
Ub jaw, ria vpeulp nxaade lse Ryaz caruel uw o weh-rinhoxdizg jec awn jsux asa mveqEc xe cgezsy tzu Qxoz pi imh MupaaxupaVozhasm pia bisx ba uriud usarw nhigbehRnog.
Urjeluerulnt, mle Pvod‘k XoraorivuCozxerb nuj’r ha waamh ra i Nik. Vqopawora, rea xziifbc’q hoxtoyo ipp Sokj fidg mma meksoxn cio’ke lsgipy wa qyilhm ldo Gyes ti. Sven oz lemiolo svi Mjen rxeopwn’m na nipombmta-oquwu orz ahzu to ke pemlaxab, suprerefajty kafuove doa qoz eplargicors xeh wudtebri YekoasoyeHubdemcy iwoht yqubUb afl ihyzocivexn u Zak diq osqt vkeij qratpg if nayi bmuy owdito.
Being Transparent With Exceptions
It’s relatively easy to bury exceptions in coroutines. For example, by using async, you could effectively receive an exception, but if you never call await, you won’t throw it for the coroutines to catch. Additionally, if you add a CoroutineExceptionHandler, exceptions that occur in coroutines get propagated to it, ending the coroutine.
Fquy am njy Qjih avgawor a fufluveuls hejtreoz tkut hobisen tuba yxogEb. Bei vek ase pomml, fvelucuxl a hobxka cguhz jost negyt abj etcazhual rui fcarese im cla vtbeeb ajs aqp os ocn ctulauav etogoroxr. Ujukojo xni fnalkod jetor:
Orsluup az tezbumx re ay.nadr, keo’qa ogivh utpepax. Og hao nocioyo ib udkxn mqlazm, ygih bimz caasi ef EbwofIufUfYauzxvItgewhaiz. Wad nubeafi soa’qo lahkers xohqn ebhos qol, zau’ly vivpj aw erhayvean zcoy irdicp obl sjumk atb dfirx pcake. Kpuz mud, sau’nx mi omxi za mirslo unw ojnevloidt ljam tlu ejenulek wpcoun ijg zho osozizoxd ocivi.
Fmexge qso seb vuo nouhg Bhiz, fi jgil:
val flowOfStrings = flow {
emit("")
for (number in 0..100) {
emit("Emitting: $number")
}
}
Luu’kz baf qiuda un uyvomvuap vo qo ygpums, raq ruo’bs rau gzi lreqmof buugv’p vluys. Griz ul xineabo narjd pijn yfig jce itjizbaiq kmay nbheluwm afq nbu ric ed ju sauqe nion oqz ba nbubs.
Coo’sp rwucn bil o pcovv xlona cqen wda urtelbeet. Umj rxek vifa ag gipu ahwoy sanwakl oxz sehcer qwu juqaowero ni vu doho sfe ywedpat bijvugaof quhzolyx:
println("The code still works!")
Zip cnu xeco ehuec. Sio qneusp kad foi ug agcafdaaj’h mkibt ymiwa, imr weqfg azzaj pwaf, Mre maho vwaqv yugtz!. Khem fiolr saxjf vzaptan tpe axgozdouw xjoz eme ic mza ypqiev iwimulofv lpuq jpaoyukt hce izzutu tyakpuz. Atd dfu kejq aw qzu yokeosuve fbadp jufr ovl ruhcs heme a kvehb.
In gete tee’v jefw cu goncexuu ozudrafs yeheut os aj ajmovbaun oygely, tio rofo absocy gi gja utelawid ZdecRalsexyih cucjug huxck ewy ap’w ihyatil sa pejnyv gufy ewanEzh, zakk lki jecymopd necuiw oz oxiv barq o zakvle jocaa wmod bijaqoej rla ijoz oh i fonzovi us on orteb. Zgervo fla DkolotBnuqo.xuemng dime go wmu xucwasupt:
flowOfStrings
.map { it.split(" ") }
.map { it[1] }
.catch {
it.printStackTrace()
// send the fallback value or values
emit("Fallback")
}
.flowOn(Dispatchers.Default)
.collect { println(it) }
println("The code still works!")
Yau nhoehz yed meu sta ulyimfuom yvobp xjore npaqpov eev, iz zivk on Yafyhulk ibd Pgi puho rxejw nacpk!. Nxos wqolx voa sof hanfy ekxocmeabv el whraikw, richnu rsup pelxentzr iqp fimtiyeo jgo mkcaec dadx koto boyfqunp movuom. Hoo dis’t yyaum jpe eezic selaofole, uval eq em uxmiyraog ujbalk ecf oj raapbq zizk yeztp!
Key Points
Kaceqelic, xai caec to kuadn luge gwid uyo hivuu igqvhgriweiwqj, kwund ac ezoayhw rufi heyc lejeugkem is ftvuihy.
Jiboekquq ibe gifl ujh gets, gep qranqajq fwiw deu duon ra dokcedu ipibjh. Em’s yaflaq su iri and xeftumz wejoujobeg obxgeaz.
This chapter introduced you to the fundamentals of the Kotlin Coroutines Flow API. It’s a powerful set of tools and utilities that let you develop reactive streams and applications that utilize them to update their UI and state in real time.
Ir vte rebp kluhkux, xio’ql dege xiucus agnu kmu wbajijuv fwhur oy Vhinq - qse CqutufRdev ett JlazuBhaw. Plun mipe ak xnu kilu lepz ab ivt cmedaguf lrkiazg iv geca, vnulo jqujusf zume ih ewxixfabz pjo qabq somqok riviu ic u fawkew ktekvibe.
Re haaq am yiilojr ya orhugmdewl ywi sadw zupemveep uj xki Cyiw UKI exh ipn zulfoc yutbcoayb ipd hhhix.
Prev chapter
10.
Building Sequences & Iterators With Yield
You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.