Reactive Programming with RxAndroid in Kotlin: An Introduction

Learn about how Reactive programming is a whole new paradigm using RxJava and RxAndroid in Android with Kotlin. By Kyle Jablonski.

Leave a rating/review
Download materials
Save for later
Share
You are currently viewing page 4 of 4 of this article. Click here to view the first page.

Flowable

With the release of RxJava2, the framework has been totally redesigned from the ground up to solve some problems that were not addressed in the original library. One really important topic addressed in the update is the idea of backpressure.

Backpressure is the concept that an observable is emitting items faster than the consumer can handle them. Consider the example of the Twitter firehose, which is constantly emitting tweets as they are added to the twitter platform. If you were to use observables, which buffer items until there is no more memory available, your app would crash and consuming the firehose API would not be possible using them. Flowables take this into consideration and let you specify a BackPressureStrategy to tell the flowable how you want the consumer to handle items emitted faster than can be consumed.

Backpressure strategies:

  • BUFFER– Handles items the same way as RxJava 1 but you can also add a buffer size.
  • DROP– Drops any items that the consumer can’t handle.
  • ERROR– Throws an error when the downstream can’t keep up.
  • LATEST– Keeps only the latest item emitted by onNext overwriting the previous value.
  • MISSING– No buffering or dropping during onNext events.

Turning Observables into Flowables

Time to turn the observables above into flowables using this new knowledge of backpressure strategy. First consider the observables you added to your app. You have one observable that emits items when a button is clicked and another from keyboard input. With these two in mind, you can imagine in the first case you can use the LATEST strategy and in the second you can use the BUFFER.

Open CheeseActivity.kt and modify your observables to the following:

val buttonClickStream = createButtonClickObservable()
    .toFlowable(BackpressureStrategy.LATEST) // 1

val textChangeStream = createTextChangeObservable()
    .toFlowable(BackpressureStrategy.BUFFER) // 2
  1. Convert the button click stream into a flowable using LATEST BackpressureStrategy.
  2. Convert the text input change stream into a flowable using BUFFER BackpressureStrategy.

Finally, change the merge operator to use Flowable as well:

val searchTextFlowable = Flowable.merge<String>(buttonClickStream, textChangeStream)

Now, change the call to use the new searchTextFlowable value, instead of the previous Observable:

searchTextFlowable
    // 1
    .observeOn(AndroidSchedulers.mainThread())
    // 2
    .doOnNext { showProgress() }
    .observeOn(Schedulers.io())
    .map { cheeseSearchEngine.search(it) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
      // 3
      hideProgress()
      showResult(it)
    }

Re-run the applicaton and you should see a working app with none of the pitfalls of observables.

Maybe

A Maybe is a computation that emits either a single value, no value or an error. They are good for things such as database updates and deletes. Here you will add a new feature using a Maybe to favorite a type of cheese from the app and use Maybe to emit no value.

Open the CheeseAdapter class and add the following code in onBindView:

   // 1
Maybe.create<Boolean> { emitter ->
  emitter.setCancellable {
      holder.itemView.imageFavorite.setOnClickListener(null)
  }

  holder.itemView.imageFavorite.setOnClickListener {
    emitter.onSuccess((it as CheckableImageView).isChecked) // 2
  }
}.toFlowable().onBackpressureLatest() // 3
         .observeOn(Schedulers.io())
         .map { isChecked ->
           cheese.favorite = if (!isChecked) 1 else 0
           val database = CheeseDatabase.getInstance(holder.itemView.context).cheeseDao()
           database.favoriteCheese(cheese) // 4
           cheese.favorite // 5
         }
         .subscribeOn(AndroidSchedulers.mainThread())
         .subscribe {
           holder.itemView.imageFavorite.isChecked = it == 1 // 6
         }
  1. Create the Maybe from an action.
  2. Emit the checked state on success.
  3. Turn the Maybe into a flowable.
  4. Perform the update on the Cheeses table.
  5. Return the result of the operation.
  6. Use the result from the emission to change the outline to a filled in heart.

Note: It would probably be better to use Maybe in context with a delete operation but for example purpose here you can favorite a cheese.

Note: It would probably be better to use Maybe in context with a delete operation but for example purpose here you can favorite a cheese.

RxJava2 & Null

Null is no longer supported in RxJava2. Supplying null will result in a NullPointerException immediately or in a downstream signal. You can read all about this change here.

RxJava and Activity/Fragment lifecycle

Remember those setCancellable methods you set up? They won’t fire until the observable is unsubscribed.

The Observable.subscribe() call returns a Disposable. Disposable is an interface that has two methods:

public interface Disposable {
  void dispose();  // ends a subscription
  boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
}

Add the following property to CheeseActivity:

private lateinit var disposable: Disposable

In onStart(), set the returned value of subscribe() to disposable with the following code (only the first line changes):

disposable = searchTextObservable // change this line
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext { showProgress() }
      .observeOn(Schedulers.io())
      .map { cheeseSearchEngine.search(it) }
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe {
        hideProgress()
        showResult(it)
      }

Since you subscribed to the observable in onStart(), onStop() would be a perfect place to unsubscribe.

Add the following code to CheeseActivity.kt:

@Override
override fun onStop() {
  super.onStop()
  if (!disposable.isDisposed) {
    disposable.dispose()
  }
}

And that’s it! Build and run the app. You won’t “observe” any changes yourself, but now the app is successfully avoiding RxJava memory leaks. :]

Where to Go From Here?

You can download the final project from this tutorial here. If you want to challenge yourself a bit more you can swap out this implementation of RxJava and replace it with Room’s RxJava support which you can find more about here.

You’ve learned a lot in this tutorial. But that’s only a glimpse of the RxJava world. For example, there is RxBinding, a library that includes most of the Android View APIs. Using this library, you can create a click observable by just calling RxView.clicks(viewVariable).

To learn more about RxJava refer to the ReactiveX documentation.

If you have any comments or questions, don’t hesitate to join the discussion below!

Kyle Jablonski

Contributors

Kyle Jablonski

Author

Filip Babić

Tech Editor and Final Pass Editor

Tyler Bos

Editor

Over 300 content creators. Join our team.