RxJava Combining Operators
In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
RxJava Combining Operators
25 mins
- Getting Started
- What Are RxJava Combining Operators?
- Using the amb Operator to Load the Fastest Source
- Using the zip Operator to Load All Sources At Once
- Using the merge Operator to Load Data ASAP
- Using the startWith Operator to Emit an Item Immediately
- Using the concat Operator
- Using the combineLatest Operator to Correct the UI Behavior
- Using the mergeDelayError Operator
- Assessing the Switch Operator
- Assessing the join Operator
- Assessing the groupJoin Operator
- Where to Go From Here?
Using the merge Operator to Load Data ASAP
The merge
operator subscribes to all the passed sources simultaneously and relays their emissions as soon as they’re available. Hence, it doesn’t guarantee the sequential ordering of the emissions, which makes it suitable for handling infinite sources.
RxJava also provides an instance method, mergeWith()
, to merge two sources.
Open SplashViewModel.kt. The class contains populateData()
, and the splash screen loads using this method when the app starts:
fun populateData(places: Places, costs: List<Cost>) {
val insertPlaceSource = database.getPlaceDao().bulkInsert(places)
.delay(2, TimeUnit.SECONDS)
.doOnComplete { Log.i(LOG_TAG, "Completed inserting places into database") }
val insertCostSource = database.getCostDao().bulkInsert(costs)
.delay(1, TimeUnit.SECONDS)
.doOnComplete { Log.i(LOG_TAG, "Completed inserting costs into database") }
insertPlaceSource.mergeWith(insertCostSource)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onComplete = onCompleteHandler, onError = onErrorHandler)
.addTo(disposable)
}
Both insertPlaceSource
and insertCostSource
bulk insert corresponding items into the database. Below their definitions, the two streams merge, and the resulting Completable
invokes onCompleteHandler
, signaling that both of them have completed. You can assess the Logcat to see how the merge operator behaves. Enter –> into the Logcat search box to filter out noise from other apps:
2020-08-21 21:35:24.368 I/-->: Completed inserting costs into database 2020-08-21 21:35:25.366 I/-->: Completed inserting places into database 2020-08-21 21:35:25.366 I/-->: completeHandler after: 2s
In PlaceListActivity, mergeWith()
can be used to load the result into the RecyclerView
adapter as soon as it’s available. Open PlaceListViewModel.kt and confirm that loadOnReceive()
is using mergeWith()
to do exactly that.
Build and run. In the menu, tap Load When Received.
You’ll see that Earth’s places load before Mars’ places. Why? Recall the merge operator’s behavior — unlike the zip operator that waits for all sources to emit, the merge operator relays the emissions as soon as they’re available.
It’s important to remember that ordering isn’t guaranteed when using the merge operator. To maintain the order, RxJava provides the concatenation operator. But before getting into that, you’ll learn about converting UI events into streams and another combining operator — startWith
.
Using the startWith Operator to Emit an Item Immediately
The startWith
operator returns an Observable
that emits a specific item before it begins streaming items that are sent by the source.
The Android SDK provides a callback mechanism to perform actions on UI events like button clicks, scroll changes, etc. Using this allows you to, for example, create an observable source that emits on every UI event callback using Observable.create()
, as explained in the Reactive Programming with RxAndroid in Kotlin tutorial.
Open PlaceDetailActivity.kt. Observable.create()
is used in conjuntion with extention methods to convert UI events to observable sources:
/**
* Converts the checked change event of [CheckBox] to streams
*/
private fun CheckBox.toObservable(): Observable<Boolean> {
return Observable.create<Boolean> {
setOnCheckedChangeListener { _, isChecked ->
it.onNext(isChecked)
}
}.startWith(isChecked)
}
The extension method above returns an Observable
which, when subscribed, starts receiving check change events in the form of emissions. Before that, it immediately emits the argument that’s passed to startWith()
, which is the isChecked
value of the checkbox.
Remember you need to subscribe to the Observable
returned by the extension functions to start receiving the events; calling the extension function isn’t enough:
val checkboxSource = checkbox.toObservable()
checkboxSource.subscribe {
Log.d(LOG_TAG, "New Checkbox value: $it")
}
You can subscribe to the stream using the code above.
Now that you’ve learned how to turn UI events into Observable
s, you’ll learn about the concatenation operator.
Using the concat Operator
The concat
operator is similar to the merge operator, with one very important distinction: It fires emission of sources sequentially. It won’t move on to the next source until the current one calls onComplete()
.
This behavior makes it unsuitable for handling infinite sources, as it’ll forever emit from the current source and keep the others waiting.
In loadDataInUI
inside PlaceDetailActivity.kt, you can see that combineUsingConcat()
uses concat()
to combine the events of Checkbox
and NumberPicker
:
private fun combineUsingConcat(
booleanObservable: Observable<Boolean>,
integerObservable: Observable<Int>
): Disposable {
return Observable.concat(booleanObservable, integerObservable)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = { input ->
Log.i(LOG_TAG, input.toString())
}).addTo(disposable)
}
These individual Observable
s are infinite in the sense that they never call onComplete()
; they’re disposed only when the activity is destroyed. So, using concat()
or concatWith()
will only relay items from the fastest source while the other one is “starved”. If you assess the Logcat, you can see that only one of the two (whichever gets to call startWith
the earliest) gets printed.
Despite these gotchas, concat
is the go-to operator when ordering is crucial.
You can correct the current UI behavior using combineLatest
, which you’ll learn about next.
Using the combineLatest Operator to Correct the UI Behavior
The combineLatest()
operator immediately couples the latest emissions from every other source for every emission. This behavior makes it perfect for combining UI inputs.
Open PlaceDetailActivity.kt, and in loadDataInUI()
, remove combineUsingConcat()
and replace it with a call to combineUsingCombineLatest()
, as shown below:
val isTwoWayTravelObservable = twoWayTravelCheckbox.toObservable() val totalPassengerObservable = numberOfPassengerPicker.toObservable() // combineUsingConcat(isTwoWayTravelObservable, totalPassengerObservable) combineUsingCombineLatest(this, isTwoWayTravelObservable, totalPassengerObservable)
combineUsingCombineLatest()
uses combineLatest()
to pair the latest value from NumberPicker
with the latest isChecked
value of Checkbox
. Then it uses them to calculate the total price of the trip, as shown in the code below:
private fun combineUsingCombineLatest(
data: PlaceDetail, booleanObservable: Observable<Boolean>,
integerObservable: Observable<Int>
) {
Observable.combineLatest<Boolean, Int, Pair<Boolean, Int>>(
booleanObservable,
integerObservable,
BiFunction { t1, t2 ->
return@BiFunction Pair(t1, t2)
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = { input ->
val passengerCount = input.second
val isTwoWayTravel = input.first
resultTextView.text =
viewModel.calculateTravelCost(data.cost, passengerCount, isTwoWayTravel)
.toString()
}).addTo(disposable)
}
Build and run. Click on any of the places. Now the UI should work as expected.
Next, you’ll learn about mergeDelayError
.
Using the mergeDelayError Operator
MockApiService.kt has a fetchFromExperimentalApi()
, which returns an error as soon as it’s subscribed to. Using the merge operator to join fetchMarsPlaces()
, fetchEarthPlaces()
and fetchFromExperimentalApi()
will immediately call onErrorHandler
— with no time for the first two to emit. With mergeDelayError
, you can allow an observer to receive all successfully emitted items without being interrupted by an error.
To see the behavior of mergeDelayError
, open PlaceListViewModel.kt and place the following code inside loadExperimental()
:
startLoading()
Single.mergeDelayError(
service.fetchFromExperimentalApi(),
service.fetchMarsPlaces(),
service.fetchEarthPlaces()
).subscribeOn(Schedulers.io())
.doOnSubscribe { recordStartTime() }
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribeBy(
onNext = onDataLoadHandler,
onComplete = onCompleteHandler,
onError = onErrorHandler
)
.addTo(disposable)
Build and run. Tap the “Experimental Features (UNSTABLE)” item from the menu.
You’ll see a toast with an error message only after the two sources, fetchMarsPlaces()
and fetchEarthPlaces()
, have emitted. Despite occurring before both streams complete, the error is sent only after they do.
Next, you’ll learn about the switch
operator.