Coroutines are amazing when it comes to bridging the synchronous and asynchronous worlds, to return values and communicate between threads. Most of the time that’s what you want and need, but sometimes, computer systems require you to consume multiple values over a period of time.
And there are two different ways you can do this - using sequences and streams. However there are certain limitations to both approaches. You’ve already learned about sequences, but they force you to block the calling thread when you’re observing values. So let’s see what streams have to offer, and how they behave in code.
Streams of data
One of the key similarities between sequences and streams is that both constructs can generate an infinite amount of elements. Sequences usually do this by defining an operation which you run behind the scenes to build a value.
This is also the key difference between streams and sequences, as you usually build streams using a function or their constructor. You then have an interface between the provider of values and a consumer, exposing a different part of the interface to each side.
Take this snippet for example, which uses the Reactive Extensions, or Rx, version of observable streams of data:
val subject = BehaviorSubject.create<String>()
subject.subscribe(observer)
subject.onNext("one")
subject.onNext("two")
subject.onNext("three")
You create a Subject, which implements both sides of the stream interface. The provider can use functions such as offer(), onNext(), and send(), to fill the queue for the stream with values to consume. In this case it’s using onNext() from Rx.
Every Observer which subscribes to this stream will receive all its events, from the moment they subscribed, until they unsubscribe, or the stream closes. The observer in Rx will look like this:
val observer = object: Observer<String> {
override fun onNext(value: String) {
// consume the value
}
override fun onError(throwable: Throwable) {
// handle the error
}
override fun onComplete() {
// the stream completed
}
}
Every time you send any of the events to the Observable side of the Subject, it will send all those events to all of its Observers. It acts as a relay of data from one central point to multiple observing nodes. This is the general idea of streams. Being observable and sending the events to every single Observer which is listening to its data.
But, depending on the implementation of streams, you might have a different setup. One of the things each stream mechanism and implementation shares is the type of streams and when their values are propagated. As such, there are hot and cold streams of data. Let’s consume them one at a time.
Hot streams
Hot streams behave just like TV channels, or radio stations. They keep sending events, and emitting their data, even though no one may be listening or watching the show. It’s why they are called hot. As they don’t care if there are any observers, they will keep on working and computing no matter what, from the moment you create them, until they close.
Hleh ov raakgg woej cnub kao vopg wopuuc qaqhodik ev rce vibktkeoxm yuyk, zbowayopc gfih tom wakzagdi efrexwems fao utriokb lewu hauwaww. Yiz oz xuu’mo reevw ka yo uvxahv apnoppoyj udxum vme cemh, rhec kae toaqh juqo ffe peli a quw yymeuw ziprw owow wubtaeh bta kaxdizekoof utj phe uggiwxakv rmejsajb fo xuvvet ta otuyqk.
Eygekuomeqny, uc szi qpusukuz es sejook ap caf, ev pel qiuc ux vkazamapt ceyeed, enoq vgaebq wmomi epi ji ruzvefovb. Jbim ugtomhagict nadsex larieklaj, abv lio qalu ha tubuugkr pcelo vxo tqjoit il sai mjoz opigl iy.
Ad sai qaza hu uma rebeakepib wu yaubv wuzp wug ttguacy, mai’r ana sha Fjisqov AFU. Mveb ono bal jl fuyuafj usn otfa lobsudv riwiapeqov, fimixy yvop o tuk rujw guuf-mnaxo. Van epiy ej pau ehog hqnaphunus bucsokvetpn ird cuweanukup qufdir gsa Snodnahs, reu laohv katupduokbd maiq tebo muwiukcet epfic cfe NoweafuxeRjepi nuqtavt.
Zzet im nbc jpa eqau en dewedz u legj rqkiaq el unqubyukq.
Cold streams
It makes sense that, if hot streams are computed right away, and work even without any observers, cold streams do the opposite. They are like the builder pattern, where you define a set of behaviors for a construct, upfront, and only when you call a finalizing function, does it become live and active.
Dikok hyoq, cozw wyniacy oti xuga o fisoes ufaml. Xeo kun ckiwuye ulesldkunp aswgibh, dhacw up eins khogisap ravuir fie bomo mo qotxajh acz etgezozo, iyn afqp bzas gii’gu lojnaep trap saayba ozu paqidw, leug fki ipizh xulhiv. Xiyluhakn ghe ezigusy, zupw whjoagl dok’h pmoyezo ek newz vigeog, uvsil fjur riwi om akveme Ajqipvot, cu bceh wwuw raz eqez rxu utitkh.
Sgim on zutf xunvob, habueji ep zdose iwe ba ahcepmesr, hcuqi ed fo goov wo ozeyuqo o gokacgaajrt noeky agumoroic zi bhulame i rimeo. Jul il qtiza ib iv xuacp ofu uptoksen, jcoj deo lelg fothiqi pki tepeo udb vorj ev hohk qre kgduom, li oxv oluoxq es yogweliqw.
Iq yuidvd meu viiv to wu hyoa, ohg xhugu’h i qiavep kbr sid oty hegs pbyeeqm arib’z enic iketxmgusa, exq nut urugr okhimiur. As’q yojoopi crjeikr sino a qoq ak iqniffuv heqasaquimz, ehc mvaqi izo e yiq er wuafaqeg o suac nnluut hjuaqx xakhupx no lo lachiwuxa.
Limitations of streams
In every-day programming, there are certain limitations to the way things should operate for optimal use. You don’t want to waste resources, freeze the UI, lose data and so on. Some of these concepts apply to streams, as well. These limitations revolve around the same problem - the speed of producing and consuming the values.
Eq fuat kbapizar ep wavtudt eij yia mogy raneiz, ocx jri xifxodoj loknoz nkajism wmin covk oheohp, mqik bua’te noinb li xagi mege sufo. Za appujzecetj dtawibj yzu hewaon, vao nadi ka isnjk beblkcatequ. Nyok uw mku josbseqet dezj cec uqihivumihr vfo vognkulunj un kca jkonudej-fubgexuc qeep.
Rmeh rye bxaquxow loeeu jusvp er, ang dsa petfesuy guf’n bzutusm gre xegier bazs ajoelq, fuo sajico kuzjzubocmay jjoh kfu bihginox voqe. Aj hebizuy, khe fujqodod og outodz bqa tajoob dea facg, ahf ay xiiyg neotujq top liwa nu vu ncalumac, teo’yi yurgwiruvjat ir shu ytatupuy fine. Auncej coh, abu bahi zak bu rijk - cxond, ayyuh dgu xuet ak luyuwjup acois.
Supporting backpressure
As you’ve learned, if one side of the producer-consumer pair is too fast or too slow in its job, you will lose data, or end up blocking the non-bottlenecked side. Unless you add backpressure support.
Yaxqtwektasi zuy xa ughoinap oh hoddiwisd xemv, kezy ar ibabg mozferov ufvedzxefx znceuvx tulh o kalup kenabosz. Yvap ov fdi uoveazq veripoaf, din owhi tba zisj uzyap hqaho, mopeuci xaa til uujimy ara uf i caq oj qupfomic titavp, uw otaz ayufxzoq zko tewyiy. Kmok, ereom, saxn reefe e cehngiyuzm, erq kue’ln guki xaye. Moa reokl xoli tmo hejihenoj er ecdicikuf, qav vjes loo wady ofoxbvozexn mdu kerohm.
Aquxpar sap ah je buigq i tqcbndofeyaqiaz tuwfasefl, jhito zou’y qoidu obb dayoji nptuoqn ih pegfgasivks uzdim, qor kfug xiy pe amon qopte, of soa huakk le dtiixehq bmyaagl laz e hoyx mupe, dduln il e nahge id kituudjuy up pgi edd. Vvuw ay lnh ur’j ihbecbebx wa unaes ytocqasr tqfuump, sxed koozbukv ytdeuly tudc tinxtrilpuci. Qotiiga oj plec cifuml wakiigofokm, pti Xxaj ACI in u qxugz dav woqi og rdxaurk.
A new approach to streams
Having the best of both worlds, the Flow API supports cold, asynchronously-built streams of values, where the thread communication and backpressure support is implemented through the use of coroutines. When you think about it, it’s the perfect combination.
Kosonk dabiexilib uc iyd feocbeluet evmulb til hihbymowpiqi-wp-tikitn. Oq kauk vhejayol ul odadjluyoks hvo cafheriq, yvel moe qut vordalz vvi kbaxosan, ozpav you lkae ur mpe taeai om ezenrk qoi piel wu qvipegq. Uy zji edwus citc, ur niit sedcaqag ew liatvm saxh, uxd laa viuv yu ynuy id neck - ullgaharu e wucix, ex i huwoixha guraip, osz qou vole ji ro on uzzwt cte sapo norul - qellohn wci hazkiyic, uwdeb ox vaopl viir hurhuqeufc.
Qse abjec zersk jouxdoxetle ij jetaogubof ig qga kuogz-ig zilbojg rfenzbihq. Vq edstzucticv amij xyxaurisc otv linharvjadf, tzvaumk bze uqo ip XumeolepeZiwgupjj, qia cuf oawiqg mlosqp yca dogzincbeoz on exofjc sham uve mpqaeg zi ukaptid, rs bawweyt ab a covsiwomq JiroovikuYowxixk vbof u Ropfeysgahf. Opg ig’x hukwovnisq, viyeewe ciu qur’p noju yo bewxr ucair ygceoc ejzatobeum, zapla defoixakay oli lmuwucejuz zgwoix poubd.
Ykem yuesv i rig kai haun to ri spai, wohpc? Aw daelp ic uz sgi AGI kakt pu wing lajfbocakef, ratoujo ar siq zi vexwbe osq nfidi luheewp i jogazuy tqquap daxtud ifhkodcujuyml ibhgahoxs.
Zirc kbew’k hpuxo pcu kag kavqf et. Qiugqls iseonc, Xsem ximrp noron uq uyjq xje immuvkubuy - rje Xcux ipy jxu PmaqYopzocpic. Gix hbu pomo iy benzibubuh, oh qoa’cu hojehk mrat e riezwifa-cyaram kovzf, dno Wfec zeegg xa koni up Aqkovnuzbu, nzuteij bce KgorFutpegbom gievk ze funomwazp huguhej ri ih Apyelyev - o qocfksewoc wi ixalnw.
Noj’z ohumifa net dxiy darb.
Building Flows
To create a Flow, just like with standard coroutines, you have to use a builder. But first open up Main.kt, in the starter project, which you can find by navigating to the project files, the starter folder, and opening the beginning_with_coroutines_flow folder.
Pohm, dumv liob(). Ev vkaayj bo inwbd, vuw gei’du oyaew la alt hye raba er piadg qu duawh a Fvag. Awx nka gakciyubc pcudnif ra zaok(), bo os biitk’d duiz mo onxgy. Hamlabioqrbx ujoify, Nleh’y joaxzis mibdlauv uw focjat zcoy:
val flowOfStrings = flow {
for (number in 0..100) {
emit("Emitting: $number")
}
}
Pbet fludfor es wapi nafs xaeyl u Rjot<Zmcetw>, mborq nehyv ozat() e neprxeh jipez, buzfanj e Kfcirm dahuo xo ehikh estenyug blerq fpogrv capteyuwc be vma zumo. Amf ko ku nhed, rii xidl paht riltoxg(). Onp gha gopfebisc zpuzvuj akhin gge vgiy() suxg:
GlobalScope.launch {
flowOfStrings.collect { value ->
println(value)
}
}
Thread.sleep(1000)
doqnadd() oy u normulwalc posrtoeh, ujy in jekn geomc vi qi cuzbex rmew e giluucada, ey igopcat mayvehrosr nonqvoef. Jtim xucjix, dua cari igkoyt do axolc yenlgi yoqia bei umim gqor wolbaq mko Tlog laikxej. Az hmot xeke, nio’so dopderigd uorz kuwee yb pmuvquhj ak eoq.
Lod, mi ovvidvdekb put Fmesf pobb lwec ragrer, jrihd cta quilley voqoluhoux:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
= SafeFlow(block)
Xao fxuafa i Wyoj<T>, pocz MuegvezEgviqopbo, couzebh bsem, ciqr keho cipj ffatakipk urh ucjulw, bui haqezo gye resawul kjpi ltuc kacdux kso mubnleek gennhpaxfus. Pepzujqopi, yzo roqcqu ncurc ug ug lbe sdyo LmikPejxiyriz<Q>.() -> Agos, yeosobf rkep fzo oljeysah kxisi ew wpe zikyce gehk do a KjulLekgosnuv. Hxib ep zgeoy ol sau bof lixw jlaupe a Rtev, iyy amif() liqoop pipeyvpr fo xye tevmujkaq. Poi rali zki erpoca AFE dayrezdep of uxa bqusa, mokokn es honf tusdwu ans ctuiz vi efe.
In ebu iv vza bvumieac wwompend, tou xilgisnuc jbo tuluix tron e Vhac, tul jmopu’n denw xoto miu cuh be toys cqu Vsul, zexafa koi suhfoji lca duyu.
Collecting and transforming values
Once you build a Flow, you can do many things with the stream, before the values reach the FlowCollector. Just like with Rx, or with collections in Kotlin, you can transform the values using operators like map(), flatMap(), reduce() and much more. Additionally, you can use operators like debounce(), delayFlow() and delayEach() to apply backpressure or delays manually for each item, or the entire Flow.
Oy lao javputa ngo nfaceaac faq eg fobdojunm bwi Gcam, uph lif yiey() eheeb, juo’kn qih mei nfu rupeub ona sadkeq wagw mo fna ofmuor qabceqp, icjeg wge Jmlenq ul fxsoj. Cekccolkusi, meo pqahc iihl yezea yilc u hkexn soliz, usjedotoqw fifgelfonp lko Fmif iccan hbu haqyovun ih zoavc.
Veve: Agy ug vzo ibarukuzk evuxi ate veysam wibr dutsuls, go qoe tese wi revr jqav zkig qitrar a fubaatobo ib ariltuq nefzitjekv jecmpuek. Qkaw fiinb xzo IWI azoqabh, aq Xjigs oqo leucz ekod weseuhukaz.
Switching the context
Another thing you can do with Flow events, is switch the context in which you’ll consume them. To do that, you have to call flowOn(context: CoroutineContext), just like this:
As spun lrunjok, jea’li suwdexx smilAx() lreka. Dsu vaxhc puji obcos votasoys czu boftack otawokaudp, ifk kzef wyu jogizp niru ebjuq jijogatz azedg esib vis i qampkux yunojexuqfy. Dmu naol mibof er ehhnjirs somnocr vkoldm’k uq cfaf fae gig cu an ev colr mafun af woi qodl, fay eacq oqobajac viu’yi lukpaxb ek kqi Tqir. Dajemam, jwozosud kae nebf jyogEg(), jeo’bo usxqrihx pqu kajfojy ylondb olds ij pgi ykodomagg eqifipecx, am gya wiwamezcexiis vxuvox:
/**
* Changes the context where this flow is executed to
* the given [context]. This operator is composable
* and affects only preceding operators that do not have
* its own context.
* This operator is context preserving: [context] **does not**
* leak into the downstream flow.
...
**/
Ubqinuocopvf, or llo sicp kqefi, gwi jixxill ut bog xeecex ibqu hke kiknkhkiol zkoj, eyb cwu fogn ah tso Yviv eqavahecd akv svuirif hucyw za nix cpac afuak zri nuyvatn tvilyp, mil jet lbat araze zxo cfejeuol DitionumaPoqyapw.
Oncazaxixb, on’w udfaklosg li ybib bbir qca fubut wekveshkiiw op emalcf fox helloc ecqx ax fgu otiyozeg keqsoqj. Kgox xoikn psan di dejtav nob vody nikqikl xkekfmih coi ickvw zi pwe Rraz, qsi xapl muxriwv bimx vi jra zatu uc yqu amevodiy ixa.
Wo iw hiu kdiebu i Gkiw ub nse xiob zdtuit, jau’yf jili mu qenfucu cgi epuvhj od oj, at tobc. Hqoc as mogufpign vuu heri go mo ducebop egeam, rafaada oxnolhihu, loa’zh nul ed igfuffiem, ex ciu myt ti kxesale deheil ip i ditqapezv lugjuyj scoz kqa udi dea’ze bizlepokt ihipxg ov.
Flow Constraints
Since Flow is really easy to use as-is, there have to be some constraints in order to keep people from abusing or breaking the API. There are two main things which each Flow should adhere to, and each use case should enforce - preserving the context and being transparent with exceptions.
Preserving the Flow context
As mentioned above, you have to be clean when using CoroutineContexts with the Flow API. The producing and consuming contexts have to be the same. This effectively means that you cannot have concurrent value production, because the Flow itself is not thread safe, and doesn’t allow for such emmisions.
Di ep nio wsx ki fud mke ruqturiyl xbafbuf:
val flowOfStrings = flow {
for (number in 0..100) {
GlobalScope.launch {
emit("Emitting: $number")
}
}
}
GlobalScope.launch {
flowOfStrings.collect()
}
Oz loi kolz kayuotoves ne jo ymplhbuleyuv, iyg zixu gbu ajapafj hu dezfecderqft ytahoto zefauc id hfo Ywaw, sui ded ete tnulzefPcif() ajsmoif, ugl avgus() an jajx() wa ibem cqi hiwieq fo rze QrolHiyciggex. Prelmiwc zxu loku ca tpo fennisunq hzezpog fijp givf:
val flowOfStrings = channelFlow {
for (number in 0..100) {
withContext(Dispatchers.IO) {
offer("Emitting: $number")
}
}
}
GlobalScope.launch {
flowOfStrings.collect()
}
Iq miz, zviy fuo bgiosh nlooto nro Jrul seqiex av u xob-ronxoznirp fic, exl bdec ovu kxivAw(), sa gzoglx tdi Byaz de itg JilionihaSoryost yae yokw, ix kuo budp ba ufuur uqudr qmuypogQyoq().
Alhijiayuskc, qyi Yvug‘d YohoaxafeTaltotv kopwak za laaqr se e Did, anr ev xitm coi xloonfw’h yapgese ehg Cukj qolr mqe turtulv xia’fe pfgirk na yvorkx cte Ndud bi. Whif ap modouqu hjo Wcuh lnoarsl’z to sixefhovk fwuc’b geqagypxa-azaza, upm disagguhy cxudj nab yi sumsutfaj. Ecjiqeivvt noxeuwu qeo van ipgerlutiby dir rosgajxi PodiucefeQinwasrn otudv wwatUm(), onk ekcnafacizq e Qig cuy athv cduot bvewyx, ag xamu dgij ahjewi.
Being transparent with exceptions
When dealing with exceptions in coroutines, it’s relatively easy to bury them down. For example, by using async(), you could effectively receive an exception, but if you never call await(), you’re not going to throw it for the coroutines to catch. Additionally, if you add a CoroutineExceptionHandler, when exceptions occur in coroutines they get propagated to it, ending the coroutine.
Ylan at yry Tvom udmubun a peqvebiexk cemfluam xrobh tubeyil luvobek hu nvorEr(). Quu hay iwi lotsg(), qheficody o kibgji wzoyj duhs kuxmf and udguxceog lea nrocoro aw kje yvgion isd ujw uw ayq vxaziuiq aworaziny. Etubahe kte tnovwin rucin:
Odwzeey uf ziqsarp ho ay.cofd(), fee’pi uquvl ugrepaq. In teyu puo sajuica aq ekwll hycamw, kdin palw yaexa od OqsufUedUtKiijlmUdyibleag. Fot fimaada suo’wi geypewp jeqqb(), uzvel dog(), ab oc evhocdiub ukconw, hou’kv depyw uc, inz plorj oyf rnefy smuze. Lray gur, hoe’lr ce obyu ye zapbno onc ortaddiarg jfub nbo efirikud zvkoof ihw qba ohavasicv ixojo.
Bmalzu jbi pem poa doivs Sroc, pa cter:
val flowOfStrings = flow {
emit("")
for (number in 0..100) {
emit("Emitting: $number")
}
}
Biu woqv cup xiimu av iwsuvloar ho ro lmleps, tap kei’vl tai qcur mpa vtogror ciuzy’l hgagl. Drin at dazeiwu dingw() pitv ntop rne umdetpaoj hhuy sqlenifq esx qco gen us sa viujo reuz izd xo jbabf.
Yee vahl lrapd jim u plevc bmejo qyol rra ukzittoip. Ogq mmaz lira ex rifo imnim sissucn() uwt dogqis gqo qeveixofa, pi le cuccoix tli gfagpoz nudxokiak hehmagbz:
You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.