Reactive Programming with RxAndroid in Kotlin: An Introduction
Learn about how Reactive programming is a whole new paradigm using RxJava and RxAndroid in Android with Kotlin. By Kyle Jablonski.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Reactive Programming with RxAndroid in Kotlin: An Introduction
30 mins
- Getting Started
- What is Reactive Programming?
- Difference Between RxJava and RxKotlin
- RxJava Observable Contract
- How to Create an Observable
- Observe Button Clicks
- RxJava Threading Model
- The Map Operator
- Show Progress Bar with doOnNext
- Observe Text Changes
- Filter Queries by Length
- Debounce Operator
- Merge Operator
- Flowable
- Turning Observables into Flowables
- Maybe
- RxJava2 & Null
- RxJava and Activity/Fragment lifecycle
- Where to Go From Here?
Flowable
With the release of RxJava2, the framework has been totally redesigned from the ground up to solve some problems that were not addressed in the original library. One really important topic addressed in the update is the idea of backpressure.
Backpressure is the concept that an observable is emitting items faster than the consumer can handle them. Consider the example of the Twitter firehose, which is constantly emitting tweets as they are added to the twitter platform. If you were to use observables, which buffer items until there is no more memory available, your app would crash and consuming the firehose API would not be possible using them. Flowables take this into consideration and let you specify a BackPressureStrategy
to tell the flowable how you want the consumer to handle items emitted faster than can be consumed.
Backpressure strategies:
- BUFFER– Handles items the same way as RxJava 1 but you can also add a buffer size.
- DROP– Drops any items that the consumer can’t handle.
- ERROR– Throws an error when the downstream can’t keep up.
- LATEST– Keeps only the latest item emitted by onNext overwriting the previous value.
- MISSING– No buffering or dropping during onNext events.
Turning Observables into Flowables
Time to turn the observables above into flowables using this new knowledge of backpressure strategy. First consider the observables you added to your app. You have one observable that emits items when a button is clicked and another from keyboard input. With these two in mind, you can imagine in the first case you can use the LATEST strategy and in the second you can use the BUFFER.
Open CheeseActivity.kt and modify your observables to the following:
val buttonClickStream = createButtonClickObservable() .toFlowable(BackpressureStrategy.LATEST) // 1 val textChangeStream = createTextChangeObservable() .toFlowable(BackpressureStrategy.BUFFER) // 2
- Convert the button click stream into a flowable using LATEST BackpressureStrategy.
- Convert the text input change stream into a flowable using BUFFER BackpressureStrategy.
Finally, change the merge operator to use Flowable as well:
val searchTextFlowable = Flowable.merge<String>(buttonClickStream, textChangeStream)
Now, change the call to use the new searchTextFlowable
value, instead of the previous Observable
:
searchTextFlowable
// 1
.observeOn(AndroidSchedulers.mainThread())
// 2
.doOnNext { showProgress() }
.observeOn(Schedulers.io())
.map { cheeseSearchEngine.search(it) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// 3
hideProgress()
showResult(it)
}
Re-run the applicaton and you should see a working app with none of the pitfalls of observables.
Maybe
A Maybe is a computation that emits either a single value, no value or an error. They are good for things such as database updates and deletes. Here you will add a new feature using a Maybe to favorite a type of cheese from the app and use Maybe to emit no value.
Open the CheeseAdapter class and add the following code in onBindView:
// 1 Maybe.create<Boolean> { emitter -> emitter.setCancellable { holder.itemView.imageFavorite.setOnClickListener(null) } holder.itemView.imageFavorite.setOnClickListener { emitter.onSuccess((it as CheckableImageView).isChecked) // 2 } }.toFlowable().onBackpressureLatest() // 3 .observeOn(Schedulers.io()) .map { isChecked -> cheese.favorite = if (!isChecked) 1 else 0 val database = CheeseDatabase.getInstance(holder.itemView.context).cheeseDao() database.favoriteCheese(cheese) // 4 cheese.favorite // 5 } .subscribeOn(AndroidSchedulers.mainThread()) .subscribe { holder.itemView.imageFavorite.isChecked = it == 1 // 6 }
- Create the Maybe from an action.
- Emit the checked state on success.
- Turn the Maybe into a flowable.
- Perform the update on the Cheeses table.
- Return the result of the operation.
- Use the result from the emission to change the outline to a filled in heart.
Note: It would probably be better to use Maybe in context with a delete operation but for example purpose here you can favorite a cheese.
Note: It would probably be better to use Maybe in context with a delete operation but for example purpose here you can favorite a cheese.
RxJava2 & Null
Null is no longer supported in RxJava2. Supplying null will result in a NullPointerException immediately or in a downstream signal. You can read all about this change here.
RxJava and Activity/Fragment lifecycle
Remember those setCancellable
methods you set up? They won’t fire until the observable is unsubscribed.
The Observable.subscribe()
call returns a Disposable
. Disposable
is an interface that has two methods:
public interface Disposable {
void dispose(); // ends a subscription
boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
}
Add the following property to CheeseActivity
:
private lateinit var disposable: Disposable
In onStart()
, set the returned value of subscribe()
to disposable
with the following code (only the first line changes):
disposable = searchTextObservable // change this line
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { showProgress() }
.observeOn(Schedulers.io())
.map { cheeseSearchEngine.search(it) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
hideProgress()
showResult(it)
}
Since you subscribed to the observable in onStart()
, onStop()
would be a perfect place to unsubscribe.
Add the following code to CheeseActivity.kt:
@Override
override fun onStop() {
super.onStop()
if (!disposable.isDisposed) {
disposable.dispose()
}
}
And that’s it! Build and run the app. You won’t “observe” any changes yourself, but now the app is successfully avoiding RxJava memory leaks. :]
Where to Go From Here?
You can download the final project from this tutorial here. If you want to challenge yourself a bit more you can swap out this implementation of RxJava and replace it with Room’s RxJava support which you can find more about here.
You’ve learned a lot in this tutorial. But that’s only a glimpse of the RxJava world. For example, there is RxBinding, a library that includes most of the Android View APIs. Using this library, you can create a click observable by just calling RxView.clicks(viewVariable)
.
To learn more about RxJava refer to the ReactiveX documentation.
If you have any comments or questions, don’t hesitate to join the discussion below!