RxJava Combining Operators
In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
RxJava Combining Operators
25 mins
- Getting Started
- What Are RxJava Combining Operators?
- Using the amb Operator to Load the Fastest Source
- Using the zip Operator to Load All Sources At Once
- Using the merge Operator to Load Data ASAP
- Using the startWith Operator to Emit an Item Immediately
- Using the concat Operator
- Using the combineLatest Operator to Correct the UI Behavior
- Using the mergeDelayError Operator
- Assessing the Switch Operator
- Assessing the join Operator
- Assessing the groupJoin Operator
- Where to Go From Here?
RxJava is a ReactiveX port for Java and a programming paradigm that provides a way to put static items into motion. It comes packed with many intuitive APIs to merge, filter, transform or otherwise alter the stream of data.
But what does “putting static items into motion” mean? Think of it this way — as you’re reading this tutorial, your brain works in an RxJava-like fashion. It converts static text into a stream of words, and then it transforms the words into your inner voice. Some information is even discarded, as though your mind is filtering it out. And if you’re listening to music while reading, your brain is combining the streams from both sources, much like RxJava’s combining operators.
You’ll see several of these nifty features explained in this tutorial. You’ll also:
- Understand RxJava combining operators and their behavior
- Use combining operators to consolidate multiple streams
- Convert UI events into streams and then combine them
You’ll implement an app called TouRx. With TouRx, you can find your next vacation, calculate the travel cost and even configure the UI if you’re super picky about how you want your vacations shown to you. :]
So, pack your bags and get ready!
To start learning RxJava, read this Reactive Programming with RxAndroid in Kotlin tutorial.
To start learning RxJava, read this Reactive Programming with RxAndroid in Kotlin tutorial.
Getting Started
Download the starter project by clicking the Download Materials button at the top or bottom of the tutorial. Launch Android Studio 4.0 or later and select Open an existing Android Studio project. Then navigate to and open the starter project’s folder.
Build and run the app. You’ll see the following screen:
You’ll see a list of places from Earth and Mars. Tapping on any of them will show the trip details and cost. In the top-right corner, there’s a menu that lets you configure the app behavior. Most of these functions don’t do anything in the starter project yet, as their corresponding PlaceListViewModel
functions are blank. You’ll fill each up as you progress through this tutorial.
The important files to note are:
-
BaseViewModel.kt, which contains an abstract
ViewModel
class that provides RxJava-compatible handlers likeonErrorHandler
,onDataLoadHandler
,onSuccessHandler
andonCompleteHandler
. These update the UI state and useLiveData
for their reactive interface. These handlers must operate on the main thread. - BaseActivity.kt, which contains the base activity class that notifies children classes whenever there’s a change in the state.
-
ApiService.kt, which provides the blueprint to handle IO operations.
MockApiService
is an implementation that simulates “expensive” IO operations using a Room database while also adding some delay in every call. It’s important to note thatfetchMarsPlaces()
is slower thanfetchEarthPlaces()
. - State.kt represents the states of the IO operations.
The app needs to perform multiple IO operations to display the data. You’ll use RxJava’s combining operators in the upcoming sections to accomplish this task. But first, you’ll learn more about what they are.
What Are RxJava Combining Operators?
In the Reactive Programming with RxAndroid in Kotlin: An Introduction tutorial, you learned about RxJava’s Observable
and Flowable
and how to operate on them. With combining operators, you can combine and consolidate them.
Some of the combining operators provided by RxJava are:
merge
concat
combineLatest
zip
amb
startWith
mergeDelayError
switchOnNext
join
groupJoin
_With()
, like mergeWith()
, concatWith()
, zipWith
and so on) for most of the static method operators listed above. RxKotlin provides a Kotlin-friendly way of using RxJava by delegating the calls to the corresponding RxJava method.
You’ll learn about the behavior of these operators in the upcoming sections, starting with the ambiguous operator.
Using the amb Operator to Load the Fastest Source
The Ambiguous operator, better known as amb
, makes its sources “compete” against each other. The fastest source to emit a result gets relayed down the chain, while all other sources are disposed. RxJava also provides ambWith()
to perform the ambiguous operation using only two sources.
On the listing screen, the fastest source loads using ambWith()
. Open PlaceListViewModel.kt and place the following code inside loadTheQuickestOne()
:
startLoading()
val earthSource = service.fetchEarthPlaces()
val marsSource = service.fetchMarsPlaces()
.doOnDispose { Log.d(LOG_TAG, "Disposing mars sources") }
earthSource.ambWith(marsSource)
.subscribeOn(Schedulers.io())
.doOnSubscribe { recordStartTime() }
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onSuccess = onSuccessHandler, onError = onErrorHandler)
.addTo(disposable)
In the code above, earthSource
responds (or emits) faster than marsSource
. The streams are being combined with ambWith
, so the result from earthSource
gets displayed while marsSource
gets disposed.
Build and run. Tap Load The Quickest One from the menu.
When you have many streams, the static method amb()
can take in a collection of Observable
, allowing you to specify a large number of sources and only allowing the quickest one to move downstream.
In the next section, you’ll learn about the zip operator and how it can simultaneously accommodate the content of multiple streams.
Using the zip Operator to Load All Sources At Once
The zip
operator and its instance method flavor, zipWith()
, allow you to take an emission from each Observable
source and combine them into a single emission. You must pass in a lambda, which defines how they combine, as the last argument.
RxKotlin provides a simpler zipWith()
extension function that combines emission from two sources into a Pair
— so you don’t have to supply the lambda as the last argument when using this.
Open PlaceDetailViewModel.kt. You can see that loadPlaceDetails()
uses zipWith
from RxKotlin to combine two sources into a Pair
:
fun loadPlaceDetails(id: Int) {
startLoading()
val costSource = service.fetchCostById(id)
val placeSource = service.fetchPlaceById(id)
costSource.zipWith(placeSource)
.subscribeOn(Schedulers.io())
.doOnSubscribe { recordStartTime() }
.observeOn(AndroidSchedulers.mainThread())
.map {
return@map PlaceDetail(cost = it.first, place = it.second)
}
.subscribeBy(onSuccess = onSuccessHandler, onError = onErrorHandler)
.addTo(disposable)
}
Launch the app and click on any of the places to view its details. costSource
and placeSource
are IO operations with varying delays, however, the data in PlaceDetailActivity
reacts as if they load at the same time. This is because zipWith
waits for the emission from both sources to couple them into a Pair
and relay them down the chain.
zipWith
is an extension from RxKotlin and is different than RxJava’s zipWith
in terms of the input argument. RxKotlin’s version takes in one argument only and relays the Pair
of the emission down the chain. So, the import statement must look like import io.reactivex.rxkotlin.zipWith
.
Zipping multiple Observable
s is useful when you need results from multiple sources simultaneously. Open PlaceListViewModel.kt and put the following code inside loadAllAtOnce()
:
startLoading()
service.fetchEarthPlaces()
.zipWith(service.fetchMarsPlaces())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { recordStartTime() }
.map {
return@map listOf(*it.first.toTypedArray(), *it.second.toTypedArray())
}
.subscribeBy(onSuccess = onSuccessHandler, onError = onErrorHandler)
.addTo(disposable)
Build and run, and tap All At Once in the menu.
You’ll see the data from Mars and Earth sources loads simultaneously, despite the varying delays.
With zip
, an emission from one source waits to get paired with the emission of other sources. So if one of the sources calls onComplete()
, the emissions from others will drop. This behavior implies that faster sources have to wait on slower sources to provide emissions to couple with.
Now that you know how to use the zip operator, you’ll learn about the merge operator.