RxJava Combining Operators
In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
RxJava Combining Operators
25 mins
- Getting Started
- What Are RxJava Combining Operators?
- Using the amb Operator to Load the Fastest Source
- Using the zip Operator to Load All Sources At Once
- Using the merge Operator to Load Data ASAP
- Using the startWith Operator to Emit an Item Immediately
- Using the concat Operator
- Using the combineLatest Operator to Correct the UI Behavior
- Using the mergeDelayError Operator
- Assessing the Switch Operator
- Assessing the join Operator
- Assessing the groupJoin Operator
- Where to Go From Here?
Assessing the Switch Operator
switchOnNext
subscribes to an Observable
that emits Observable
s. It unsubscribes from the previously emitted source when a new Observable is emitted from the source, and it starts emitting items from the new source instead. Any emissions from the previous Observable
are dropped.
Open PlaceListViewModel.kt and put the below snippet inside switchOnNext():
disposeCurrentlyRunningStreams()
// 1
val outerSource = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext {
Log.i(LOG_TAG, "Emitted by OuterSource: $it")
}
// 2
val innerSource = Observable.interval(1, TimeUnit.SECONDS)
.doOnSubscribe {
Log.i(LOG_TAG, "Starting InnerSource")
}
// 3
Observable.switchOnNext(
outerSource.map { return@map innerSource }
)
.doOnNext {
Log.i(LOG_TAG, "Relayed items $it")
}
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.subscribe().addTo(disposable)
Here’s what the code above does. It:
- Creates a source,
outerSource
, that emits an item every three seconds - Creates another source,
innerSource
, that emits an item every second - Uses
switchOnNext()
, which causes every item emitted by the outer source (i.e.onNext
) to create the inner source. When the outerObservable
emits another item, the inner one gets discarded and a new one is created, which is then used to emit items.For every item emitted by the outerObservable
(i.e. every three seconds), the inner observable manages to emit three items (one item per second). The last item isn’t relayed downstream, as the outer source has already emitted an item (similar to in the marble diagram above).
Build and run, and tap Switch On Next located inside Demo. Note that every feature inside Demo is printed in Logcat and isn’t visible in TouRx’s UI. So open up Logcat to assess the printed logs and input –> to filter out the noise. You’ll get something like this:
2020-08-21 22:02:18.964 I/-->: Emitted by OuterSource: 0 2020-08-21 22:02:18.964 I/-->: Starting InnerSource 2020-08-21 22:02:19.965 I/-->: Relayed items 0 2020-08-21 22:02:20.964 I/-->: Relayed items 1 2020-08-21 22:02:21.964 I/-->: Emitted by OuterSource: 1 2020-08-21 22:02:21.964 I/-->: Starting InnerSource 2020-08-21 22:02:22.964 I/-->: Relayed items 0 2020-08-21 22:02:23.964 I/-->: Relayed items 1 2020-08-21 22:02:24.964 I/-->: Emitted by OuterSource: 2 2020-08-21 22:02:24.964 I/-->: Starting InnerSource 2020-08-21 22:02:25.964 I/-->: Relayed items 0 2020-08-21 22:02:26.964 I/-->: Relayed items 1 2020-08-21 22:02:27.964 I/-->: Emitted by OuterSource: 3
Next, you’ll learn about the join
operator.
Assessing the join Operator
The join
operator combines the items emitted by two sources whenever an item emitted by one falls under the duration window. In other words, it selects which items to combine based on overlaps between the streams. The windows are implemented as Observables whose lifespans begin with each item emitted by either Observable
and end when the window-defining Observable
completes emiting. As long as the item’s window is open, it can combine with any item emitted by the other source.
To see the join
operator in action, open PlaceListViewModel.kt, and inside demonstrateJoinBehavior()
, paste the following:
disposeCurrentlyRunningStreams()
// 1
val firstObservable = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map {
return@map "SOURCE-1 $it"
}
// 2
val secondObservable = Observable.interval(3000, TimeUnit.MILLISECONDS)
.map { return@map "SOURCE-2 $it" }
// 3
val firstWindow = Function<String, Observable<Long>> {
Observable.timer(0, TimeUnit.SECONDS)
}
val secondWindow = Function<String, Observable<Long>> {
Observable.timer(0, TimeUnit.SECONDS)
}
// 4
val resultSelector = BiFunction<String, String, String> { t1, t2 ->
return@BiFunction "$t1, $t2"
}
//5
firstObservable.join(secondObservable, firstWindow, secondWindow, resultSelector)
.doOnNext {
Log.i(LOG_TAG, it)
}
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.subscribe().addTo(disposable)
Now, build and run. Tap Demo from top-right menu, and click Join. This will fire up demonstrateJoinBehavior()
in PlaceListViewModel
. Open the Logcat to assess the logs. Your logs will look similar to this:
2020-08-21 22:18:20.562 I/-->: SOURCE-1 4, SOURCE-2 1 2020-08-21 22:18:23.562 I/-->: SOURCE-1 7, SOURCE-2 2 2020-08-21 22:18:26.562 I/-->: SOURCE-1 10, SOURCE-2 3 2020-08-21 22:18:29.562 I/-->: SOURCE-1 13, SOURCE-2 4 2020-08-21 22:18:32.562 I/-->: SOURCE-1 16, SOURCE-2 5 2020-08-21 22:18:35.562 I/-->: SOURCE-1 19, SOURCE-2 6 2020-08-21 22:18:38.562 I/-->: SOURCE-1 22, SOURCE-2 7 2020-08-21 22:18:41.562 I/-->: SOURCE-1 25, SOURCE-2 8 2020-08-21 22:18:44.562 I/-->: SOURCE-1 28, SOURCE-2 9
It’s time to break down the reason for this log! The code above:
- Creates a source,
firstObservable
, that emits items every second - Creates a second source,
secondObservable
, that emits items every three seconds - Initializes two windows —
firstWindow
andsecondWindow
— which define the lifespan of the window forfirstObservable
andsecondObservable
, respectively - Declares a
resultSelector
that couples the emitted items into aString
- Uses
join()
to perform a join operation on theObservable
s created in the first two steps. Since the window duration is zero-seconds wide, bothfirstWindow
andsecondWindow
must emit at the same time in order for them to be coupled and relayed down. With the specified intervals of sources and the length of the window, the overlaps occur every three seconds.
Try experimenting with different window lengths to learn more about the join
operator. Next, you’ll learn about a similar operator to join: groupJoin
.
Assessing the groupJoin Operator
The groupJoin
operator is similar to the join
operator, except the argument that defines how the items should be combined — i.e. resultSelector
— pairs individual items emitted from the left source with another source that holds all the values emitted within the window.
Time to see the GroupJoin
operator in action! Open PlaceListViewModel, and in demonstrateGroupJoin()
, place the following code:
disposeCurrentlyRunningStreams()
// 1
val leftSource = Observable.interval(1, TimeUnit.SECONDS)
.map { return@map "SOURCE-1 $it" }
val rightSource = Observable.interval(5, TimeUnit.SECONDS)
.map { return@map "SOURCE-2 $it" }
// 2
val leftWindow = Function<String, Observable<Long>> {
Observable.timer(0, TimeUnit.SECONDS)
}
val rightWindow = Function<String, Observable<Long>> {
Observable.timer(3, TimeUnit.SECONDS)
}
// 3
val resultSelector = BiFunction<String, Observable<String>, Observable<Pair<String, String>>> { t1, t2 ->
return@BiFunction t2.map {
return@map Pair(t1, it)
}
}
leftSource.groupJoin(rightSource, leftWindow, rightWindow, resultSelector)
.concatMap {
return@concatMap it
}
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.doOnNext {
Log.i(LOG_TAG, it.toString())
}
.subscribe().addTo(disposable)
Finally, build and run. Click on the top-right menu and select Group Join inside Demo. Then, open the Logcat to see the printed logs:
2020-08-21 23:17:17.756 I/-->: (SOURCE-1 4, SOURCE-2 0) 2020-08-21 23:17:18.756 I/-->: (SOURCE-1 5, SOURCE-2 0) 2020-08-21 23:17:19.756 I/-->: (SOURCE-1 6, SOURCE-2 0) 2020-08-21 23:17:20.756 I/-->: (SOURCE-1 7, SOURCE-2 0) 2020-08-21 23:17:22.756 I/-->: (SOURCE-1 9, SOURCE-2 1) 2020-08-21 23:17:23.756 I/-->: (SOURCE-1 10, SOURCE-2 1) 2020-08-21 23:17:24.756 I/-->: (SOURCE-1 11, SOURCE-2 1) 2020-08-21 23:17:25.756 I/-->: (SOURCE-1 12, SOURCE-2 1) 2020-08-21 23:17:27.756 I/-->: (SOURCE-1 14, SOURCE-2 2) 2020-08-21 23:17:28.756 I/-->: (SOURCE-1 15, SOURCE-2 2) 2020-08-21 23:17:29.756 I/-->: (SOURCE-1 16, SOURCE-2 2) 2020-08-21 23:17:30.756 I/-->: (SOURCE-1 17, SOURCE-2 2) 2020-08-21 23:17:32.756 I/-->: (SOURCE-1 19, SOURCE-2 3) 2020-08-21 23:17:33.756 I/-->: (SOURCE-1 20, SOURCE-2 3) 2020-08-21 23:17:34.756 I/-->: (SOURCE-1 21, SOURCE-2 3) 2020-08-21 23:17:35.756 I/-->: (SOURCE-1 22, SOURCE-2 3)
Here’s a breakdown of what the code above is doing:
-
leftSource
emits an item every second, whereasrightSource
emits an item every five seconds. -
leftWindow
andrightWindow
are windows forleftSource
andrightSource
, respectively. Note the difference in the lifespan of these windows. - The signature of
resultSelector
is an important distinction between theGroupJoin
and the Join operators. SincerightWindow
has a lifespan of three seconds andleftSource
emits every second, the second argument in the lambda collects all the emissions in that three-second window while mapping each into aPair
before sending it downstream.
Experiment with varying window sizes to get more familiar with the GroupJoin
operator.