Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

10. Combining Operators in Practice
Written by Florent Pillet & Alex Sullivan

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

In the previous chapter, you learned about combining operators and worked through increasingly more detailed exercises on some rather mind-bending concepts. Some operators may have left you wondering about the real-world application of these reactive concepts.

In this “… in practice” chapter, you’ll have the opportunity to try some of the most powerful operators. You’ll learn how to solve problems similar to those you’ll face in your own applications.

Note: This chapter assumes you’ve already worked your way through Chapter 9, “Combining Operators,” and are familiar with both filtering (Chapter 5) and transforming operators (Chapter 7). At this point in the book, it is important that you are familiar with these concepts, so make sure to review these chapters if necessary!

You’ll start with a new project for this chapter and build a small application with an ambitious name: Our Planet.

Getting started

The project will tap into the wealth of public data exposed by NASA. You’ll target EONET, NASA’s Earth Observatory Natural Event Tracker. It is a near real-time, curated repository of natural events of all types occurring on the planet. Check out https://eonet.sci.gsfc.nasa.gov/ to learn more!

To get started with Our Planet, open the starter project for this chapter in Android Studio 4.0 or newer.

Build and run the starter application; the default screen is empty.

Your tasks with this application are as follows:

  • Gather the event categories from the EONET public API https://eonet.sci.gsfc.nasa.gov/docs/v2.1 and display them on the first screen.
  • Download events and show a count for each category.
  • When user taps a category, display a list of events for this category.

You’ll learn how useful combineLatest can be in several situations, but you’ll also exercise concat, merge, and scan. Of course, you’ll also rely on operators you are already familiar with, like map and flatMap.

Preparing the EONET API class

Good applications have a clear architecture with well-defined roles. The code that talks with the EONET API shouldn’t live in any of the activities. Instead, it will live in an object that you’ll reference from a ViewModel.

Fetching categories

Now open the EONETApi file and add a fetchCategories method to the bottom of the class, after the companion object block.

@GET(EONET.CATEGORIES_ENDPOINT)
fun fetchCategories(): Observable<EOCategoriesResponse>
fun fetchCategories(): Observable<EOCategoriesResponse> {
  return eonet.fetchCategories()
}

Updating the CategoriesViewModel

Open CategoriesViewModel and add the following to the empty startDownload method:

// 1
EONET.fetchCategories()
  // 2
  .map { response ->
    val categories = response.categories
      categories.mapNotNull { EOCategory.fromJson(it) }
  }
  // 3
  .share()
  // 4
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  // 5
  .subscribe {
    categoriesLiveData.value = it
  }
  .addTo(disposables)

Add events into the mix

Now that you’ve got the categories loading, it’s time to update the app to actually display the number of events in the category.

class EOEventsResponse(val events: List<AnyMap>)
@GET(EONET.EVENTS_ENDPOINT)
fun fetchEvents(
  @Query("days") forLastDays: Int,
  @Query("status") status: String
): Observable<EOEventsResponse>
// 1
private fun events(forLastDays: Int, closed: Boolean):
  Observable<List<EOEvent>> {
  // 2
  val status = if (closed) "closed" else "open"
  // 3
  return EONET.eonet.fetchEvents(forLastDays, status)
    //4
    .map { response ->
      val events = response.events
      events.mapNotNull { EOEvent.fromJson(it) }
    }
}
fun fetchEvents(forLastDays: Int = 360): Observable<List<EOEvent>> {

}
val openEvents = events(forLastDays, false)
val closedEvents = events(forLastDays, true)
return openEvents.concatWith(closedEvents)

Combining events and categories

you’ve got a fancy fetchEvents method that fetches all of your events, so now it’s time to utilize it in the CategoriesViewModel class.

fun startDownload() {
  val eoCategories = EONET.fetchCategories()
    .map { response ->
      val categories = response.categories
      categories.mapNotNull { EOCategory.fromJson(it) }
    }
}
val downloadedEvents = EONET.fetchEvents()
// 1
val updatedCategories = Observables
  .combineLatest(eoCategories, downloadedEvents)
    { categoriesResponse, eventsResponse ->
      // 2
      categoriesResponse.map { category ->
        // 3
        val cat = category.copy()
        // 4
        cat.events.addAll(eventsResponse.filter {
          it.categories.contains(category.id)
        })
        // 5
        cat
      }
    }

eoCategories.concatWith(updatedCategories)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({
    categoriesLiveData.value = it
  }, {
    Log.e("CategoriesViewModel", it.localizedMessage)
  })
  .addTo(disposables)
this.categories.clear()

Downloading in parallel

Recall that the app is currently calling the EONET events endpoint twice. Once for closed events and once for open events. Since you’re using the concat operator, it first downloads the open events and then the closed events.

@GET("{endpoint}")
fun fetchEvents(
  @Path("endpoint", encoded = true) endpoint: String,
  @Query("days") forLastDays: Int,
  @Query("status") status: String
): Observable<EOEventsResponse>
private fun events(
  forLastDays: Int,
  closed: Boolean,
  endpoint: String
): Observable<List<EOEvent>> {
  val status = if (closed) "closed" else "open"
  return EONET.eonet.fetchEvents(endpoint, forLastDays, status)
    .map { response ->
    val events = response.events
    events.mapNotNull { EOEvent.fromJson(it) }
    }
}
fun fetchEvents(category: EOCategory, forLastDays: Int = 360):
    Observable<List<EOEvent>> {
  val openEvents =
    EONET.events(forLastDays, false, category.endpoint)
  val closedEvents =
    EONET.events(forLastDays, true, category.endpoint)

  return Observable.concat(openEvents, closedEvents)
}
return Observable.merge(openEvents, closedEvents)

Incrementally updating events

You’ve done a lot of great work to parallelize downloading closed and open events, but there’s still a bit farther to go.

// 1
val eventsObservables = eoCategories.flatMap { categories ->
  // 2
  val categoryEventObservables = categories.map { category ->
    EONET.fetchEvents(category)
  }
  // 3
  Observable.fromIterable(categoryEventObservables)
}
// 4
val downloadedEvents = Observable.merge(eventsObservables, 2)
fun filterEventsForCategory(
    events: List<EOEvent>,
    category: EOCategory
): List<EOEvent> {
  // 1
  return events.filter { event ->
    // 2
    event.categories.contains(category.id) &&
        // 3
        !category.events.map { it.id }.contains(event.id)
  // 4
  }.sortedWith(EOEvent.compareByDates)
}
// 1
val updatedCategories = eoCategories.flatMap { categories ->
  // 2
  downloadedEvents.scan(categories) { updated, events ->
    // 3
    updated.map { category ->
      val eventsForCategory =
        EONET.filterEventsForCategory(events, category)

      if (!eventsForCategory.isEmpty()) {
        val cat = category.copy()
        cat.events.addAll(eventsForCategory.filter {
          it.closeDate != null
        })
        cat
      } else {
        category
      }
    }
  }
}

Wiring up the days seek bar

Open the EventsActivity class and add the following at the top of the class:

private val days = BehaviorSubject.createDefault(360)

private val subscriptions = CompositeDisposable()
seekBar.setOnSeekBarChangeListener(
  object : SeekBar.OnSeekBarChangeListener {
    override fun onProgressChanged(
      seekBar: SeekBar?, progress: Int, fromUser: Boolean) {
        days.onNext(progress)
      }

      override fun onStartTrackingTouch(seekBar: SeekBar?) {}
      override fun onStopTrackingTouch(seekBar: SeekBar?) {}
    })
val allEvents = intent
  .getParcelableExtra<EOCategory>(CATEGORY_KEY).events
val eventsObservable = Observable.just(allEvents)
// 1
Observables
  .combineLatest(days, eventsObservable) { days, events ->
    // 2
    val maxInterval = (days.toLong() * 24L * 3600000L)
    // 3
    events.filter { event ->
      val date = event.closeDate
      if (date != null) {
        abs(date.time - Date().time) < maxInterval
      } else {
        true
      }
    }
  }
  // 4
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe {
    adapter.updateEvents(it)
  }
  .addTo(subscriptions)
days
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
      daysTextView.text =
        String.format(getString(R.string.last_days_format), it)
    }
    .addTo(subscriptions)
override fun onDestroy() {
  subscriptions.dispose()
  super.onDestroy()
}

Challenge: Adding a progress bar

Start from the final project in this chapter. Place an indeterminate horizontal progress bar below the toolbar and above the list of categories on the main screen. The progress bar should show while the categories and events are being downloaded and be hidden as soon as the downloads finish.

Key points

  • The concatWith method can be used to combine two Observables to emit one after the other. Watch out for your error handling though, since one Observable encountering an error will end the whole chain!
  • If you need to parallelize multiple Observables, you can use the merge method to interweave the Observables. You can also limit the number of concurrent subscriptions happening!
  • combineLatest can be effectively used to combine the last values of multiple Observables. It’s particularly useful if you have one Observable that may not update often and another that updates frequently. Combining the two Observables with combineLatest can save you from writing a lot of stateful code!
  • The merge method has a ton of overloads. If you have a collection of Observables, there’s almost certainly a merge overload out there to merge your collection together. It even works if you have an Observable of Observables!
  • The scan operator can be used to continuously emit items as you build up progress in some process. For this chapter, the progress was fetching events for a certain type of category. If you need to build up to a final product, scan or reduce are both great options.

Where to go from here?

That wraps up our chapters focusing on filtering, transforming and combining operators. You’ve seen them all in action in Android apps.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2025 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now