Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

16. Creating Custom Reactive Extensions
Written by Alex Sullivan

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

After being introduced to RxJava and learning how to create tests, you have yet to see how to create wrappers using RxJava on top of frameworks created by Google or by third parties. Wrapping a Google or third party library component is instrumental in writing reactive applications, so you’ll be introduced to the concept in this chapter.

In this chapter you’ll create a reactive wrapper around an Android Widget, a request for a specific permission, and the process of getting location updates. It’s worth noting here that in a real application you’d probably want to use libraries rather than write these specific wrappers yourself. Later chapters in this book will introduce you to a few of those libraries.

Getting started

You’re going to be creating an app that allows a user to search for gifs through the API for Giphy https://giphy.com, one of the most popular GIF services on the web.

To start, you’ll need a beta key. To get the beta key, navigate to the official docs https://developers.giphy.com/docs/api, and scroll down to “Create an App.”

Follow the instructions there to create an app. You can pick the API key type when prompted. Name your app BestGif:

When you create an app on that page (via the “Create an App” button) you will get a development key, which will suffice to work through this chapter. The API key is displayed under the name of your newly created app like so:

Open the starter project in Android Studio. Then, open GiphyApi.kt and copy the key into the correct place:

private const val API_KEY = "YOUR API KEY HERE"

Once you’ve replaced the API key, run the app. You should see an empty screen with a simple EditText up top. It doesn’t do much yet.

Extending a framework class

It’s often useful to adapt existing framework classes to have a more reactive approach and styling. Luckily, Kotlin’s extension methods allow for a fluid interface to achieve reactive framework classes.

return Observable.create { emitter ->

}
val textWatcher = object : TextWatcher {
  override fun afterTextChanged(text: Editable) {
    emitter.onNext(text.toString())
  }

  override fun beforeTextChanged(
    p0: CharSequence?, p1: Int, p2: Int, p3: Int) {}
  override fun onTextChanged(
    p0: CharSequence?, p1: Int, p2: Int, p3: Int) {}
}
addTextChangedListener(textWatcher)
emitter.setCancellable {
  removeTextChangedListener(textWatcher)
}

Wiring the extension up

It’s time to use the new extension. Open up GifActivity.kt, and add the following to the bottom of onCreate():

text_input
  // 1
  .textChanges()
  // 2
  .flatMapSingle { GiphyApi.searchForGifs(it) }
  // 3
  .onErrorReturnItem(listOf(GiphyGif(
      "https://media.giphy.com/media/SQ24FpNRW9yRG/giphy.gif")))
  // 4
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  // 5
  .subscribe { adapter.items = it }
  .addTo(disposables)

.debounce(500, TimeUnit.MILLISECONDS)

Wrapping the locations API

The app is looking pretty good, but it’s a bit empty before the user types something in. This seems like a great excuse to wrap some more framework classes!

private val locationRequestCode = 500
private val permissionsSubject =
    BehaviorSubject.create<Boolean>()
override fun onRequestPermissionsResult(
    requestCode: Int,
    permissions: Array<out String>,
    grantResults: IntArray
) {
  super.onRequestPermissionsResult(requestCode, permissions,
      grantResults)
  if (requestCode == locationRequestCode) {
    val locationIndex = permissions.indexOf(
        Manifest.permission.ACCESS_FINE_LOCATION)
    if (locationIndex != -1) {
      val granted = grantResults[locationIndex] ==
          PackageManager.PERMISSION_GRANTED
      permissionsSubject.onNext(granted)
    }
  }
}

permissionsSubject
  .doOnSubscribe {
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
      requestPermissions(arrayOf(
          Manifest.permission.ACCESS_FINE_LOCATION),
          locationRequestCode)
    } else {
      permissionsSubject.onNext(true)
    }
  }
  .filter { it }
val currentLocationRequest = LocationRequest()
  .setInterval(500)
  .setFastestInterval(0)
  .setMaxWaitTime(0)
  .setSmallestDisplacement(0f)
  .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)

val client = FusedLocationProviderClient(context)
 return Observable.create { emitter ->

 }
val callback = object : LocationCallback() {
  override fun onLocationResult(result: LocationResult?) {
    result?.lastLocation?.let { emitter.onNext(it) }
  }
}
client.requestLocationUpdates(currentLocationRequest, callback,
    null)
emitter.setCancellable {
  client.removeLocationUpdates(callback)
}
.flatMap { locationUpdates(this) }

.take(1)
.map { cityFromLocation(this, it) }
.doOnNext { text_input.hint = it }
.flatMapSingle {
  GiphyApi.searchForGifs(it).subscribeOn(Schedulers.io())
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe { adapter.items = it }
.addTo(disposables)

The lift and compose functions

You may come across a few other functions in your Rx travels with regard to custom extensions, especially when interoperating with Java. Since Kotlin supports extension functions, you probably won’t need to use these very often, but it’s still a good idea to understand how they work in case you see them out in the wild in any Java code that you’re interacting with.

val observable: Observable<MyModelClass> =
  networkMethodThatReturnsAnObservable()
observable
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe { displayMyResults(it) }
fun <T> Observable<T>.applySchedulers(): Observable<T> {
  return subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
}
val observable: Observable<MyModelClass> =
  networkMethodThatReturnsAnObservable()
observable
  .applySchedulers()
  .subscribe { displayMyResults(it) }
Observable<Integer> observable = Observable.just(1);
Observable<Integer> schedulersApplied =
    FileContaingFunctionKt.applySchedulers(observable);
class ApplySchedulers<T>: ObservableTransformer<T, T> {
  override fun apply(upstream: Observable<T>): ObservableSource<T> {
    return upstream
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
  }
}
Observable<Integer> schedulersApplied =
  Observable.just(1).compose(new ApplySchedulers<>());

Testing your custom reactive extension

Testing your custom reactive extensions is just like testing a normal Rx chain. You just need to make sure you’re testing the right thing!

val view = EditText(context)
val testObserver = view.textChanges().test()
view.setText("Test 1")
view.setText("Test 2")
view.setText("Test 3")
view.setText("Test 4")
testObserver.assertValueCount(4)
testObserver
  .assertValues("Test 1", "Test 2", "Test 3", "Test 4")

val context =
  InstrumentationRegistry.getInstrumentation().targetContext
// 1
val locationProvider =
  mockk<FusedLocationProviderClient>(relaxed = true)

val locationObservable =
  locationUpdates(context, locationProvider)

// 2
verify(exactly = 0) {
  locationProvider.removeLocationUpdates(any<LocationCallback>())
}

locationObservable
  // 3
  .take(0)
  .test()
  .assertComplete()

// 4
verify(exactly = 1) {
  locationProvider
    .removeLocationUpdates(any<LocationCallback>())
}
fun locationUpdates(
  context: Context,
  client: FusedLocationProviderClient =
    FusedLocationProviderClient(context)
): Observable<Location> {
  ...
}

Key points

  • You can wrap an existing Android component via Observable.create().
  • You should pay attention to any long-lived references inside extensions. Clean up after yourself and cancel any resources when an Observable is disposed.
  • You explored compose() and lift() and when to use them (TL;DR avoid lift() unless you know what you’re doing; use compose() if you’re writing Java code).
  • Test your reactive wrappers by writing unit tests and mocking any system component.

Where to go from here?

In this chapter, you saw how to implement and wrap the Android framework. Sometimes, it’s very useful to abstract an official Android framework or third party library to better connect with RxJava.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2025 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now