Reactive Programming with RxAndroid in Kotlin: An Introduction
Learn about how Reactive programming is a whole new paradigm using RxJava and RxAndroid in Android with Kotlin. By Kyle Jablonski.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Reactive Programming with RxAndroid in Kotlin: An Introduction
30 mins
- Getting Started
- What is Reactive Programming?
- Difference Between RxJava and RxKotlin
- RxJava Observable Contract
- How to Create an Observable
- Observe Button Clicks
- RxJava Threading Model
- The Map Operator
- Show Progress Bar with doOnNext
- Observe Text Changes
- Filter Queries by Length
- Debounce Operator
- Merge Operator
- Flowable
- Turning Observables into Flowables
- Maybe
- RxJava2 & Null
- RxJava and Activity/Fragment lifecycle
- Where to Go From Here?
How to Create an Observable
There are many libraries to help you create observables from almost any type of event. However, sometimes you just need to roll your own. Besides, it’s a great way to learn about the Observable pattern and reactive programming!
You’ll create an Observable using Observable.create()
. Here is its signature:
Observable<T> create(ObservableOnSubscribe<T> source)
That’s nice and concise, but what does it mean? What is the “source?” To understand that signature, you need to know what an ObservableOnSubscribe
is. It’s an interface, with this contract:
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
Like an episode of a J.J. Abrams show like “Lost” or “Westworld,” that answers some questions while inevitably asking more. So the “source” you need to create your Observable
will need to expose subscribe()
, which in turn requires whatever’s calling it to provide an “emitter” as a parameter. What, then, is an emitter?
RxJava’s Emitter
interface is similar to the Observer
one:
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
An ObservableEmitter
, specifically, also provides a means to cancel the subscription.
To visualize this whole situation, think of a water faucet regulating the flow of water. The water pipes are like an Observable
, willing to deliver a flow of water if you have a means of tapping into it. You construct a faucet that can turn on and off, which is like an ObservableEmitter
, and connect it to the water pipes in Observable.create()
. The outcome is a nice fancy faucet. And of course, the faucet is reactive, since once you close it, the stream of water – data – is no longer active. :]
An example will make the situation less abstract and more clear. It’s time to create your first observable!
Observe Button Clicks
Add the following code inside the CheeseActivity
class:
// 1
private fun createButtonClickObservable(): Observable<String> {
// 2
return Observable.create { emitter ->
// 3
searchButton.setOnClickListener {
// 4
emitter.onNext(queryEditText.text.toString())
}
// 5
emitter.setCancellable {
// 6
searchButton.setOnClickListener(null)
}
}
}
Your imports should look as follows after entering the above code:
import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_cheeses.*
You’ve imported the correct Observable
class and you’re using the Kotlin Android Extensions to get references to view objects.
Here’s what’s going on in the code above:
- You declare a function that returns an observable that will emit strings.
- You create an observable with
Observable.create()
, and supply it with a newObservableOnSubscribe
. - Set up an
OnClickListener
onsearchButton
. - When the click event happens, call
onNext
on the emitter and pass it the current text value ofqueryEditText
. - Keeping references can cause memory leaks in Java or Kotlin. It’s a useful habit to remove listeners as soon as they are no longer needed. But what do you call when you are creating your own
Observable
? For that very reason,ObservableEmitter
hassetCancellable()
. Overridecancel()
, and your implementation will be called when the Observable is disposed, such as when the Observable is completed or all Observers have unsubscribed from it. - For
OnClickListener
, the code that removes the listener issetOnClickListener(null)
.
Now that you’ve defined your Observable, you need to set up the subscription to it. Before you do, you need to learn about one more interface, Consumer
. It’s a simple way to accept values coming in from an emitter.
public interface Consumer<T> {
void accept(T t) throws Exception;
}
This interface is handy when you want to set up a simple subscription to an Observable.
The Observable
interface requires several versions of subscribe()
, all with different parameters. For example, you could pass a full Observer
if you like, but then you’d need to implement all the necessary methods.
If all you need out of your subscription is for the observer to respond to values sent to onNext()
, you can use the version of subscribe()
that takes in a single Consumer
(the parameter is even named onNext
, to make the connection clear).
You’ll do exactly that when you subscribe in your activity’s onStart()
. Add the following code to CheeseActivity.kt:
override fun onStart() {
super.onStart()
// 1
val searchTextObservable = createButtonClickObservable()
searchTextObservable
// 2
.subscribe { query ->
// 3
showResult(cheeseSearchEngine.search(query))
}
}
Here’s an explanation of each step:
- First, create an observable by calling the method you just wrote.
- Subscribe to the observable with
subscribe()
, and supply a simpleConsumer
. - Finally, perform the search and show the results.
Build and run the app. Enter some letters and tap the Search button. After a simulated delay (see CheeseSearchEngine), you should see a list of cheeses that match your request:
Sounds yummy! :]
RxJava Threading Model
You’ve had your first taste of reactive programming. There is one problem though: the UI freezes up for a few seconds when the search button is tapped.
You might also notice the following line in Android Monitor:
> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread.
This happens because search
is executed on the main thread. If search
were to perform a network request, Android will crash the app with a NetworkOnMainThreadException
exception. It’s time to fix that.
One popular myth about RxJava is that it is multi-threaded by default, similar to AsyncTask
. However, if not otherwise specified, RxJava does all the work in the same thread it was called from.
You can change this behavior with the subscribeOn
and observeOn
operators.
subscribeOn
is supposed to be called only once in the chain of operators. If it’s not, the first call wins. subscribeOn
specifies the thread on which the observable will be subscribed (i.e. created). If you use observables that emit events from an Android View, you need to make sure subscription is done on the Android UI thread.
On the other hand, it’s okay to call observeOn
as many times as you want in the chain. observeOn
specifies the thread on which the next operators in the chain will be executed. For example:
myObservable // observable will be subscribed on i/o thread
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map { /* this will be called on main thread... */ }
.doOnNext{ /* ...and everything below until next observeOn */ }
.observeOn(Schedulers.io())
.subscribe { /* this will be called on i/o thread */ }
The most useful schedulers are:
-
Schedulers.io()
: Suitable for I/O-bound work such as network requests or disk operations. -
Schedulers.computation()
: Works best with computational tasks like event-loops and processing callbacks. -
AndroidSchedulers.mainThread()
executes the next operators on the UI thread.