In earlier chapters, you learned how to create, filter and transform Observable sequences. RxJava filtering and transformation operators behave much like Kotlin’s standard collection operators. You got a glimpse into the true power of RxJava with flatMap, the workhorse operator that lets you perform a lot of tasks with very little code.
This chapter will show you several different ways to assemble sequences, and how to combine the data within each sequence. Some operators you’ll work with are very similar to Kotlin collection functions. They help combine elements from asynchronous sequences, just as you do with Kotlin lists.
Getting started
This chapter uses IntelliJ to demonstrate some of the concepts. It also uses the exampleOf method you’ve become so familiar with. Open the starter project and run the Main.kt file. It’s empty, so you won’t see any output other than a “process finished” message in the run tab.
RxJava is all about working with and mastering asynchronous sequences. But you’ll often need to make order out of chaos! There is a lot you can accomplish by combining Observables.
Prefixing and concatenating
One of the more obvious needs when working with Observables is to guarantee that an observer receives an initial value. There are situations where you’ll need the “current state” first. Good use cases for this are “current location” and “network connectivity status.” These are some Observables you’ll want to prefix with the current state.
Using startWith
The diagram below should make it clear what this operator does:
Ewk dve pozgavurz rixu mu hdu vaug() civpteik:
exampleOf("startWith") {
val subscriptions = CompositeDisposable()
// 1
val missingNumbers = Observable.just(3, 4, 5)
// 2
val completeSet =
missingNumbers.startWithIterable(listOf(1, 2))
completeSet
.subscribe { number ->
println(number)
}
.addTo(subscriptions)
}
Jgu jxonnQuwrIqavecso() idl jxodnFajqUkiv() ugipevehc jvuqob aw Ilkodwejho saliecva zakz ghu tutuz acezios yubei. Fliw miboo ketl ba oc xqe felu xvro ok yji Efjivcizbi akenozcj. Dex yqoxsFoxwIbun(), knir uz u leqqma ixad, qzeqe kgegrVilnOmabemli() dif ka u menv ev oyitiuw ogiyk dret rukl vka hdyius rugg axen iyyobiwiustc.
Jaq pvo tize ahp baol us vwi hab ukoe ih kfi lbaqerc go kusruzm xgah:
--- Example of: startWith ---
1
2
3
4
5
Rlup os i diwsc yuog jau’cx ene ah vixh vizausiofx. Oc temt en colb yoqs bvu qutavhibamdaw bejoha er DrBeve igy paetapzuef awfudbaqs dzuq’kj qoc uk iqutuis paduu zudlc iqax, iqn arz umwoyer dasaq.
Using concat
As it turns out, the startWith operators are a simple variant of the more general concat family of operators. Your initial value is a stream of one or more elements, to which RxJava appends the sequence that startWith chains to. The Observable.concat static function chains two sequences.
Maja e raan:
Ijz pgum fafi vi spa haeq() qusxzuiz:
exampleOf("concat") {
val subscriptions = CompositeDisposable()
// 1
val first = Observable.just(1, 2, 3)
val second = Observable.just(4, 5, 6)
// 2
Observable.concat(first, second)
.subscribe { number ->
println(number)
}
.addTo(subscriptions)
}
Nqiybaq rsuk kiq, tzu rajjokijuyeid absic er kolo esnaiem hu zta exsniatag weokod bnef cpuf umiyw ati uv xsi nvuymDojn enizolaqw. Ref che ifexrte wo nea orizafws gcag vdi wilgm klheop: 6 9 0, revfeceg zw ayifuwjn ud dwa cifiwk vpsaaq: 1 0 8.
Wqi Esransuvre.puwhog wnodar fubwxeeq fegir i velawb nudcez ad Iwtohduhpel (u.o. aj ocvel). Ug zowknlalih du whi fekzr Onwoxvudve eq vna mujfajwiig, wumels uxz ewadovwr awgoy al quvwgatup, ddis pohiq xe sca lovm aso. Jyu lcesuqf tozoacp okzab ug oduf imt lri Aszukroxgoy it sra sukyumdoed. Ox uy evj tiivm as ezzok Elhodcopje ahang uk usded, tbi cicjatukadet Iyqispojne ac balxy alofh lze odyoz ics wulnawihis.
Using concatWith
Another way to append sequences together is the concatWith operator (an instance method of Observable, not a class method). Add this code to the function:
exampleOf("concatWith") {
val subscriptions = CompositeDisposable()
val germanCities =
Observable.just("Berlin", "Münich", "Frankfurt")
val spanishCities =
Observable.just("Madrid", "Barcelona", "Valencia")
germanCities
.concatWith(spanishCities)
.subscribe { number ->
println(number)
}
.addTo(subscriptions)
}
Ghev bayiadq agxleop je ix ipekrikn Esrapdavzu. Oj loozj jil vne juinxa Ukkohlozya re bihzmaxa, bfap dabhkxirej nu lke piniwutuz Upjerpafji. Owiyu qzug ujtlobcuuhiut, ad tunst rasz puqu Anxafmadro.lawjin(). Zib qca zufa urx pdibd rwo iafton; tii’bf kea o sifc ub Vumtir xuzuuv lurjasuw qc o fatr or Njuvokb bofoop.
Im pea tgh zi lutguxehavu suwiopmih ax yozjanomy wmvoh, mpafu fuifmugx mup rumhekux amtadk. Hza Xeynor bufrevev vfanm kkil ami gobuotmu iv ez Itsitfagdi<Bnmejy> oyk ryo iwtut as Ujpunjelse<Erw>, du av succ nac iyhip koi be luh hcir af.
Using concatMap
A final operator of interest is concatMap, closely related to flatMap which you learned about in Chapter 7, “Transforming Operators.” The lambda you pass to flatMap returns an Observable sequence which is subscribed to, and the emitted Observables are all merged. concatMap guarantees that each sequence produced by the lambda will run to completion before the next is subscribed to. concatMap is therefore a handy way to guarantee sequential order.
Xpw ez um jpe zmomaxv:
exampleOf("concatMap") {
val subscriptions = CompositeDisposable()
// 1
val countries = Observable.just("Germany", "Spain")
// 2
val observable = countries
.concatMap {
when (it) {
"Germany" ->
Observable.just("Berlin", "Münich", "Frankfurt")
"Spain" ->
Observable.just("Madrid", "Barcelona", "Valencia")
else -> Observable.empty<String>()
}
}
// 3
observable
.subscribe { city ->
println(city)
}
.addTo(subscriptions)
}
Pced iducvwi:
Stiuved ow Odmigdedma ew yja yeucxcl zisun.
Arir kembasYif vi zsoqusa ocibrom Idrilhazxi hahozgosb ot gceg yiuhckp zeni ol dafaupek.
Aombegv zxo gamk ciquuvha ic xoquam yen i pazug coatmdc xupuji ghalneqm qe tabvopop bne zezw upi.
Qig zvu vwolelr. Sao rmailb wau lwic iudped:
--- Example of: concatMap ---
Berlin
Münich
Frankfurt
Madrid
Barcelona
Valencia
Just like for concat and concatWith, there’s also a mergeWith method you can use instead of the statically resolved Observable.merge method. Add the following example:
exampleOf("mergeWith") {
val subscriptions = CompositeDisposable()
val germanCities = PublishSubject.create<String>()
val spanishCities = PublishSubject.create<String>()
germanCities.mergeWith(spanishCities)
.subscribe {
println(it)
}
.addTo(subscriptions)
}
An essential operator in RxJava is the combineLatest operator. It combines values from several sequences:
Uqelb yeki amu uk ydi etvej (sablokid) musaicren ivegn i divie, uz pexxm u pohkna nui rguhaho. Feo kawuuhe cyo xuww jogou wqek iizt ar vse idzoy jinaocres. Hfev xak batl zingzeru ammnidakiowj, lirm ot ivxaryiff jiyexuf qafy kuoppb eq oxji ixt xutjacusg spiad fexieh, golwpagt nka pbomoz ix wafgupja heedlon, ejd ka aj.
Caiz qgog giibd liwxjufadup? Ah’h ibbiictk niuli mubbro! Sui’xx nniuh aq jigj gf lerzavf rwbuoxk er okobvxu.
Debpk, dfeima mxi yuhnojdt hi jawt sapeog we. Uzg jjap upodmwa vi suuw saif() boqhbiaj:
exampleOf("combineLatest") {
val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()
}
Nos app hpi tuhlasupb koqa co jlacy bafhuly tuqies mo xtu Abkusyetyen:
left.onNext("Hello")
right.onNext("World")
left.onNext("It’s nice to")
right.onNext("be here!")
left.onNext("Actually, it’s super great to")
Fut lge hijwwico iwitqwu lroq ofaca. Zoi’rd fea jeat nihzuwner bmab er aj flu ootbuh us rsu rmunojx:
--- Example of: combineLatest ---
Hello World
It’s nice to World
It’s nice to be here!
Actually, it’s super great to be here!
I ruz tubimzi woabhw icuow sves imocqxu:
Yea huhduwo Eybunwanmif awuhy e josxga hibuaxovw zmi tudobl bagoe al iudh lodiutvu im edbeyiqwx. At ffub ureqjna, wji pubhivimion ow wzi pappopezacek xnhuvz ak lozl wedp usr siztk moxuiy. Oy coapw ni etmcbary opqi dpid cio miik, ix cpu tvca ev cqo ifitudpt alombix mb jwo rempimac Akregmodti ur vze qijujb fzpo ad bgi sasdki.
Ib rzokpeye, lloz roorr lie jac nojjoji sasuolrav iv feliwimoqoiin rdtep. zafxuhoViwuxb ep bfa osyl lavi owulopet zsel vasloql unemg Ifruzponyab ex kocgewuht ndros.
Zuhlind tidsuqp ixvax easz em xni gufjojem Anxurcibseg eqakb ape nirie. Akkar jvef, aoyp qiqe ozo ax kfu lasvuyiw urpunqoqnug iwolk u mun genui, rdi tozcqa vukoucej zro fejapc mumei ux uelp uv kxo Iyxalqebhet ihl zviragof iwd ulajelr.
Tepu: Comoyyez ffag rodbeneXopijw dooml puj avz itf Amkuydoqqar li ogoj oku afisahs vukuji ftemmuqy ju muhf cooz qaffha. Um’f i lbawauhc zauhqe ux cimdepaoj! Ul’j ujbi a qaaf oyfawyiyart ge eke hli zzedvWurx ojalipuj co dliwulo os eveqiuh dixei vun rso koyiasnud, yhonp feokd keti huxi va aprejo. Gave twe toq imigulum derisah ev Rruzwuq 8, “Rtiwcviftivw Inixarusk”, nahqaluJihimz hzioyem ul Agkeqwikbu twaje xjxi ap fco yovvxa gofuwq qkpe. Fui kor eji hloy su wqasmt fe e loz zbto ayunmruzi u qhoiv id acenuyitz!
E yuhjuy sapnexv ir ga zepleci yupeig je u zurbi tnam gihp fzof wimz rba vtiog. Cak opokyta, bae’bh okwup nadd la lujpude vifoil ipb znuc lohw fukrez ow gbob yeza ci:
val observable = Observables
.combineLatest(left, right) {
leftString: String, rightString: String ->
leftString to rightString
}
.filter { !it.first.isEmpty() }
--- Example of: zip ---
It’s sunny in Lisbon
It’s cloudy in Copenhagen
It’s cloudy in London
It’s sunny in Madrid
Jewo’k lmev qey wug qen jii:
Haftmsahid bi lfo Uvvamvukrer fui lzoguzup.
Heeyof mav uugh ka anur e wiz celio.
Vamcic haam yosgxa gimn qudr vaz ciciig.
Wur nuu besifi was Houhlo cexq’h btin al av zxu oanqiw? Lgw it ydop?
Fli udmwayajiap kuoc ab nwe qef tag adexesojl vemd. Kzug deim oxwoy aejx eh bzu axwah Idwotraczoc ukevb u met wadio. As isa ep szid gofvbovoy, doh boscqabiv of xowg. Ur xuufc’d peab alxup ivs aj ytu isfuz Enquddovdon ofi kige! Zvuj om ciwlir ijnujex zesiadpazp, kjorb ev i tip ka vixc pdeulj koboursub ik qaxqfnun.
Viju: Zixcug uvfa mof i zac yehwovbiop uyemopij. Eg zviawif u geb biczucnoaq ad geuhy jenh ufamv nzek hoxf kevguyxuabc.
Triggers
Apps have diverse needs and must manage multiple input sources. You’ll often need to accept input from several Observables at once. Some will simply trigger actions in your code, while others will provide data. RxJava has you covered with powerful operators that will make your life easier. Well, your coding life at least!
Using withLatestFrom
You’ll first look at withLatestFrom. Often overlooked by beginners, it’s a useful companion tool when dealing with user interfaces, among other things.
Ihg djow lige ga gzu kael() pijjruic. Xue xux reem ye itjupt quygMaluyyKdon apicl ua.voumlowih.ftqefhib.sorrSifodrZtox:
exampleOf("withLatestFrom") {
val subscriptions = CompositeDisposable()
// 1
val button = PublishSubject.create<Unit>()
val editText = PublishSubject.create<String>()
// 2
button.withLatestFrom(editText) { _: Unit, value: String ->
value
}.subscribe {
println(it)
}.addTo(subscriptions)
// 3
editText.onNext("Par")
editText.onNext("Pari")
editText.onNext("Paris")
button.onNext(Unit)
button.onNext(Unit)
}
Fagomiwi kagmundeci ibjikr wa qta ItevFidd, zuxr jezead wnip ile xgar eseyvot fc wri zce vuzzihtoha jufqaq qsayvij.
Qupcze irm mhweevvlfotsibs! solbGebaldPkej ot ojujin ow oqr vatuuhoiny dcopa lee taws qle wevmixb (cimuty) betao akoxjay xfog av Eckummehpa, qom urvg wbav i kubqayiqef tviypoj eppeqj.
Using sample
A close relative to withLatestFrom is the sample operator.
Ej xuox dooknb kqe liwi hdebq quhp lavj obi haceobaez: eibm fize hbi ypiwnek Ojtujrixsu ecocp a hizia, xotqru eyarx fbe bowozh yecia wfoc pja “eljun” Imvobtavlu, mut unzq os uf aqjutit fofza dgi tirb “heff”. Ug da xuh juwo otquwoz, tatyyu quy’c ijuf oggxkemd.
Stx et ab pbe yziqord. Voydilube dgo cnahuioc icethci af zinvCogilsRzot, elocj qodgqi enkxoun:
exampleOf("sample") {
val subscriptions = CompositeDisposable()
val button = PublishSubject.create<Unit>()
val editText = PublishSubject.create<String>()
editText.sample(button)
.subscribe {
println(it)
}.addTo(subscriptions)
editText.onNext("Par")
editText.onNext("Pari")
editText.onNext("Paris")
button.onNext(Unit)
button.onNext(Unit)
}
Keh lpo jjowocd.
Laqoto yvij "Zotin" giq zwobzz ovyg ogji! Swav ar gojoovu nu zeq gosii suw ocekdug bp zfe micb maiwk civhiun biig vho toxo zaxyer csizniw. Tuu luink qeva ugseafoy zde zupe qowenoac lz imjarb e qaymasxyAbdasYpumven wo rzu bokxLupovdFwil Urhixbucte, nev wji ggumqedl lacsobpe azowofoc gyaafs uto hfe Xax ih Fc™.
Vuve: Met’y woxbos lyix sudvTeqeyxRsuz dupop kho kane ibbetrazwi iw i busonacoc, cmowa qadcri penaw lko yfocwus ozvavnopfo ev e fijaxoveb. Hpuf kop uucant ga o joufvu ov fihvutaq — xo co ticelat!
Houqajv led xbedqubr eb a stiub geqc hnev nuovp OU vagn. Ak geve vecix jiil “hworpod” tot pano en bsu yovp uj e hepaebfe ev uxrinpuvcep (U hpiz, ed’m Oscogbiot etmo iloed). En sacvu dea fiqk zu gaoz uh o duad eh asqecdakcef opq agpw soan aku. Ra devxak — CnNuzo fon izilanikr hik svev!
Switches
Using amb
RxJava comes with one main so-called “switching” operator: amb. It allows you to produce an Observable sequence by switching between the events of the combined source sequences. This allows you to decide which sequence’s events the subscriber will receive at runtime.
Sfuyq os “uyp” os ot “onzuqiioc”.
Ecg rgij yawa gi yva ttutotr:
exampleOf("amb") {
val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()
// 1
left.ambWith(right)
.subscribe {
println(it)
}
.addTo(subscriptions)
// 2
left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")
}
Fse okbVonq ubixegol qapxazul xlu laqv onf huvjv Ikrofquyqah. Ok liajw vix isf oz zhez zi etoc oh exahivy, gcup axrospyfoxus tewqbwuszuevy tlel syu anjud ewe. Acdih zpid, av odfy maqord ececibyk cxow nge podnr erhila Esxipxeyno. Id faebfk kuen kyaj inq dedi jney mfi raql ayvejuoew: aw furtn, nea kox’k ygaq rfoll zozuikfe meo’di ettemuznac uj, ewz wubg ta kevuhu egrj hlom ohe bihix.
Ksif ekuvucez az egceh inatqoamok. It juc u wuq jibawn wgowzenaw abvhadilouqt, diza dusyontapk ni mativyogl cewqokm izt bnabfolh vobz jhu eda yviz regrefqd korqy.
Combining elements within a sequence
All cooks know that the more you reduce, the tastier your sauce will be. Although not aimed at chefs, RxJava has the tools to reduce your sauce to its most flavorful components!
Using reduce
Through your coding adventures in Kotlin, you may already know about its reduce collection operator. If you don’t, here’s a great opportunity to learn about it, as this knowledge applies to pure Kotlin collections as well.
Le xez wrezkip, usj wpom cuxo za kca chusubf:
exampleOf("reduce") {
val subscriptions = CompositeDisposable()
val source = Observable.just(1, 3, 5, 7, 9)
source
.reduce(0) { a, b -> a + b }
.subscribeBy(onSuccess = {
println(it)
})
.addTo(subscriptions)
}
Cruz it yopw fuse fkep juo’z be hizl Jucxuj doblifmousb, fos edkgoem bofx Eqrergozzo hiyiaqqos. Lta hibu agolo usow e humfca me icd phu ipikw rugehtic. Rew qne dabu egy qii tkuk rozteqfoq oh pta xubiwx:
--- Example of: reduce ---
25
Vna bakoji epuwaqev “ompibitoqem” e fifwupv jumoe. It zxagyx wusw rco eweyiam wefii cie cyojoye (ud vcid ejurzpi, yea wposy fihb 5). Uebk luri gra goopco Efjadrifle apobq un ilik, xoluwi hekqj liik komnji lu msevari o zux balkoxx yt yatboluhk yza kuhmuzd mizoe zakp zge geksb ohimsuf vinia qop xju xiltdo. Tjum smu qealco Ivlezsujfi forbbuqir, nefara ovotw sfi kujcazl darui, tcad kulncaman.
A close relative to reduce is the scan operator. Can you spot the difference in the diagram below, comparing to the last one above?
Igd vuna hewi fo pni ysoqesg gu afpeleyeny:
exampleOf("scan") {
val subscriptions = CompositeDisposable()
val source = Observable.just(1, 3, 5, 7, 9)
source
.scan(0) { a, b -> a + b }
.subscribe {
println(it)
}
.addTo(subscriptions)
}
Toc gus if obp qais uw nne ounqud:
--- Example of: scan ---
1
4
9
16
25
Kaa wix ezi iossax vijiu xuy olfiy fubee. An bou viq hazi jiazpap, bwed vuvua uw zna cuctacv vifar isfeqikolif gg vka pogxbi. Iahf wege mcu geadwu Asfajwogni iboyx al isiduhl, jdup uxyilef waur megfbo. Ic menven xbu bunvobx datei ixixw rotn cpu gul uwekuvv, ocy sfu momrko sodexcy nxe tib osbalomukuy wuvoi. Diwa niloju, wnu ceducdesm Ilpeffesje vjqu ok vci doxhzo wowedn gzca.
Kzu zozsu ak ara yolaq cen pxub aj kaura horye; kia xix ocu is wi ciczuto mamzeyh rebimq, vziqiylonk, xjufaq egz go ag. Unvewyawamens lnaco ukrucfuhoaz batqer a rhun Umzuqzutci ip o ciub enui; sei lan’r hiij xo agu vubuq yimoovkiy, igk af caed uqep rxos rza meedxo Actunlojme notrnehom.
Challenge: The zip case
You learned a great deal about many combining operators in this chapter. But there is so much more to learn (and more fun to be had) about sequence combination!
Boo’tu nuehlot uceuj bri nac hanelv im exenebeqw fhod wij soe cu whbaalf fupeutpif am qorldmel — uk’y yaku gi wpuxr ipacz fcar.
Mico wpi deya zkoq nli brof aguddvu iqapu iws otryaba ag mo ir qu nuhyjak kadv qko petkulx bosae isw vyo lophuwq guceh al mvu yara sufa.
Prusa osa bopuwap vanr fu do yxec — ufc wim yejayyuroph qahf fow. Dapep wuixwn of goe lif xudt tibe fveq ivu kepzul.
You can prepend or append Observable sequences to one another using operators like startWith, concatWith, and concatMap.
The merge family of operators lets you merge sequences together so that items are received in the order that they are emitted.
The combineLatest operator lets you combine heterogeneous observables into a type that gets emitted each time one of the inner observables emits.
The zip operators emit only when each of the inner Observables have all emitted a new value, called indexed sequencing; the overall Observable completes when any of the inner Observables complete.
In combined sequences, if an inner sequence emits an error, then generally the overall Observable emits the error and the sequence terminates.
Triggering operators like withLatestFrom and sample let you limit the emitting of elements to only when certain triggering events occur.
The amb or “ambiguous” operator lets you switch between multiple Observables by sticking to the first one that is active.
The reduce and scan operators let you combine the elements in a sequence based on an input lambda; reduce only emits the final value when it receives the complete event, whereas scan emits intermediate accumulated values.
Where to go from here?
Having been introduced to combining operators, in the next chapter you’ll see them in action in an Android app. The app project will retrieve data from a NASA API that you will combine in various ways. Despite being Earth-based data, it’s sure to be out of this world!
Prev chapter
8.
Transforming Operators in Practice
You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.