RxJava Filtering Operators
In this tutorial, you will learn about a group of powerful RxJava operators that will allow you to work only with the data you require at a certain step of the data transformation process: the RxJava Filtering Operators. By Andres Torres.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
RxJava Filtering Operators
15 mins
- Getting Started
- What is RxJava?
- What are RxJava Filtering Operators?
- Using the Take Operator
- The Take Operator Variations
- Using the SkipWhile Operator
- The SkipWhile Operator Variations
- Using the IgnoreElements Operator
- Using the Filter Operator
- Using the Debounce Operator
- Using the DistinctUntilChanged Operator
- Where to Go From Here?
The SkipWhile Operator Variations
skipWhile
also has a few varients, each with their own conditions:
-
skip
: Skips the first n values and then emits the rest. -
skipLast
: This operator won’t emit unless it has anObservable
that completes and lets it know which elements are last. It skips the final n items emmitted by anObservable
. -
skipUntil
: Skips items until a secondaryObservable
triggers by either emitting an item or terminating.
Sometimes you’ll only care about emitted values when the data stream completes. The ignoreElements
operator is the perfect candidate for these occasions.
Using the IgnoreElements Operator
ignoreElements
transforms the Observable
in a Completable
. A Completable
behaves similary to an Observable
except it only emits terminal events, either onError
or onComplete
, and no values.
You’ll use ignoreElements
to tell the user when data was successfully fetched and filtered. In RestaurantViewModel.kt
, add the following code to the bottom of showResults
:
this.ignoreElements() //1
.subscribeOn(Schedulers.io()) //2
.observeOn(AndroidSchedulers.mainThread()) //3
.subscribe {
_uiLiveData.value = Resource.Success(Unit) //4
}
.addTo(disposables) //5
Here’s a code breakdown:
- Any
Observable
useing this extension also triggers aCompletable
. - The completable subscribes to the corresponding scheduler.
- Then the completable observes the stream of data on the main thread provided by RxAndroid.
- Everytime the stream of data completes, the completable sends a
Resource
of typeSuccess
to the activity through the correspondingLiveData
. - Finally, the completable adds the resulting disposable to a
CompositeDisposable
that disposes when theViewModel
is removed.
Your whole extension will look like this:
private fun Observable<Restaurant>.showResults() {
this.toList()
.subscribeOn(Schedulers.io())
.map { Resource.Success(it) }
.subscribe(_restaurantsLiveData::postValue)
.addTo(disposables)
this.ignoreElements()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
_uiLiveData.value = Resource.Success(Unit)
}
.addTo(disposables)
}
Since both your getTopRestaurants()
and getLowestRatedRestaurants()
use this extension for displaying results, your Activity is notified on a separate LiveData
when those streams have successfully completed.
Build and run. Go to the toolbar menu and tap either Top 5 Rated or Rated 3 and Below. A toast will notify you as soon as the data processing completes.
Next, you’ll learn about the filter
operator.
Using the Filter Operator
The aptly-named filter
is one of the most important and versatile filtering operators. It only passes values that match a particular predicate you declare.
The filter
operator is a perfect fit for shortening your list to only the restaurants that match your specific search query. Open your RestaurantViewModel.kt and replace setQueryListener()
with:
//1
fun setQueryListener(queryObservable: Observable<String>) {
queryObservable
.observeOn(AndroidSchedulers.mainThread())
.map(::filterSource) //2
.subscribe()
}
private fun filterSource(query: String) {
Log.d(this::class.java.simpleName, "Search query: $query")
_restaurantsLiveData.value = Resource.Loading
restaurantSource //3
.filter { restaurant ->
if (query.isEmpty()) return@filter true //4
restaurant.name.contains(query, true)//5
}
.showResults()
}
Here’s a breakdown:
- Changes in the search query come through the
queryObservable
. - On every query change, you send those values emitted to
filterSource()
, a helper function that will filter the values. - This function receives the new query with which you filter your source of truth by returning either true or false.
- If the query is empty, it doesn’t filter. In other words, it emits all values.
- When the query is not empty, it compares each restaurant name with the query. If the query is part of the restaurant name, it emits the value. If the name is not, it skips that name.
Build and run. Tap the search icon, input Isl and notice your list is filtered with every letter you type.
With the filter operator, you can tap every element of your stream and sequentially and simply filter it with any conditional logic you see fit.
Next, you’ll learn about the debounce operator.
Using the Debounce Operator
Look at your Logcat and filter it to show logs from RestaurantViewModel. Notice how the filtering occurs with every letter you enter.
If you were using a remote server, this would have a significant impact on your app's performance. It could raise your server costs for unnecessary transactions. Ideally, you'll only call your data source when you're relatively sure the query you received is the one the user wants to search for.
This problem seems like a big headache, but RxJava has your back with debounce
.
The debounce
operator only emits an item from an Observable
if a particular time has passed without emitting another item.
This isn't as confusing as it may seem. debounce
lets through only the last item in a certain timespan. Open RestaurantViewModel.kt and replace setQueryListener()
with:
fun setQueryListener(queryObservable: Observable<String>) {
queryObservable
.debounce(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.map(::filterSource)
.subscribe()
}
All you did is add the debounce
operator. Here, debounce
only lets values through after a pause of 300 milliseconds. From the app perspective, this means the Observable
waits to emit the last item, which will be the most recent query, until the user stops typing characters for at least 300 milliseconds.
As a result, you get considerable performance improvement with very little code.
Build and run. Again, search for Isl and try to write without stopping much between characters. In your Logcat, you'll see the Observable
only let through and filtered one query as expected, Isl.
Without the help of RxJava, adding this performance-boosting functionality will be cumbersome.
-
throttleFirst
: Emits the first element within periodic time intervals. -
throttleLast
: Emits the last element within periodic time intervals.
throttle
. The throttle
operator emits only one item from a group of emitted values within periodic time intervals. Here are two for reference:
-
throttleFirst
: Emits the first element within periodic time intervals. -
throttleLast
: Emits the last element within periodic time intervals.
It's time for the last operator, distinctUntilChanged
.
Using the DistinctUntilChanged Operator
You'll use the last operator to resolve a small edge case. When you search for a specific query and quickly add and remove a letter, you might be fast enough to beat your debounce
operator. If you do, you'll search and filter your list based on the same search query.
Build and run, then search for Isl. As soon as the list filters go on, add an a and remove it within your debounce 300 milliseconds time period.
Check your Logcat and see the list filters based on the same search query. Since that query already filtered the list, this search is unnecessary.
This is an easy fix with the distintctUntilChanged
operator. This operator only lets through values that are different from their predecessors. Any sequential duplicated items are suppressed.
Open RestaurantViewModel.kt and replace setQueryListener
with:
fun setQueryListener(queryObservable: Observable<String>) {
queryObservable
.debounce(300, TimeUnit.MILLISECONDS)
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.map(::filterSource)
.subscribe()
}
Here you added the distintUntilChanged
operator. Build and run, search for Isl and then add and remove the letter a within the 300 milliseconds debounce period. Check you Logcat and you'll see your data was only fetched and filtered once with the query Isl!
distinct
operator. Instead of suppressing sequential duplicates, as distinctUntilChanged
does, it suppresses duplicates anywhere on your data stream.