Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

14. Flowables & Backpressure
Written by Alex Sullivan

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

You’ve been using Observables to do some pretty powerful stuff — but there’s one problem that you still need to cover. What happens if a subscriber can’t keep up with the next events that the Observable is emitting?

Backpresssure

That thorny scenario where operators or subscribers can’t consume next events as fast as an Observable may produce them is called backpressure, and you’ll explore it thoroughly in this chapter!

To start, open up the starter project for this chapter using IntelliJ IDEA. Navigate to SupportCode.kt and take a look around. You’ll find:

  • The tried and true exampleOf method, a safeSleep method that simply calls Thread.sleep and catches any InterruptedExceptions.
  • A freeMemory method that calculates the total mount of free memory the system has.

Fancy, right?

Now, head over to Main.kt and add the following code in main():

exampleOf("Zipping observable") {
  val fastObservable = Observable.interval(1, TimeUnit.MILLISECONDS)
  val slowObservable = Observable.interval(1, TimeUnit.SECONDS)
}

With the above, you’re creating two new Observables using the Observable.interval static factory method. The interval method creates an Observable that counts up from the provided number at a frequency you provide, forever, so it never terminates.

The two Observables are almost exactly the same, except one will emit a next event with a new number every millisecond, and the other will only emit every second.

Now, add the following right below the slowObservable line:

// 1
val disposable =
  Observables.zip(slowObservable, fastObservable)
    .subscribeOn(Schedulers.io())
    .subscribe { (first, second) ->
      // 2
      println("Got $first and $second")
    }
// 3
safeSleep(5000)
// 4
disposable.dispose()

That’s a solid chunk of code, so breaking it down step by step:

  1. Create a new Observable by using the zip function, which, as you know, combines two Observables together. You’re using the RxKotlin factory function to keep everything neat. It also makes a Pair from the two emitted items for you.
  2. Subscribe to the zipped Observable and print out both items.
  3. Sleep the thread for five seconds. Since you’re subscribing to the zipped Observable on the io scheduler, the “Zipping Observable” example block would finish immediately if you didn’t sleep the thread. You’d never do this in a real application since the application would never terminate naturally like this one, but it’s necessary for the examples in this chapter.
  4. As the Rx guru that you are by now, you never forget to dispose the subscriptions.

Run the Main.kt file. You should see the following:

--- Example of: Zipping observable ---
Got 0 and 0
Got 1 and 1
Got 2 and 2
Got 3 and 3

The next events are being zipped together, but it leaves one question unanswered: What’s happening to all the items that the fast Observable is emitting?

It took about five seconds to print out those four numbers, but we know in that time the fast Observable should have emitted thousands of items, since it should be emitting every millisecond.

It turns out that RxJava buffers those items under the hood. That means that it keeps a list of items that keeps growing until the downstream operators and subscribers can consume them.

Buffering danger!

Most of the time buffering next events is exactly what you want, but sometimes that buffering approach eats too much memory and can lead to OutOfMemoryError crashes!

exampleOf("Overflowing observer") {
  // 1
  val disposable = Observable.range(1, 10_000_000)
      // 2
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.computation())
      // 3
      .subscribe {
        println("Free memory: ${freeMemory()}")
        safeSleep(100)
      }
  // 4
  safeSleep(20_000)
  disposable.dispose()
}
--- Example of: Overflowing observer ---
Free memory: 3793645960
Free memory: 3769377888
Free memory: 3701230888
Free memory: 3608875976
...
.map {
  LongArray(1024 * 8)
}
--- Example of: Overflowing observer ---
Free memory: 3793645432
Free memory: 3595255472
...
Free memory: 276971776
Free memory: 277053320
io.reactivex.exceptions.UndeliverableException: java.lang.OutOfMemoryError: Java heap space

Natural backpressure

Now, backpressure isn’t always a problem. In the previous example, try removing the observeOn line and run the example again.

Introduction to Flowables

But since you usually are mucking about with threading in your RxJava chains, the RxJava library has got your back.

exampleOf("Zipping flowable") {
  val slowFlowable = Flowable.interval(1, TimeUnit.SECONDS)
  val fastFlowable = Flowable.interval(1, TimeUnit.MILLISECONDS)
  val disposable =
    Flowables.zip(slowFlowable, fastFlowable)
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.newThread())
      .subscribe { (first, second) ->
        println("Got $first and $second")
      }

  safeSleep(5000)
  disposable.dispose()
}
io.reactivex.exceptions.MissingBackpressureException: can’t deliver value 128 due to lack of requests
val fastFlowable = Flowable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop { println("Dropping $it") }
--- Example of: Zipping flowable ---
Dropping 128
Dropping 129
Dropping 130
...
Got 0 and 0
...

Backpressure strategies

You’ve seen that you can remove back-pressured items from the stream by using the onBackpressureDrop method, but there’s actually three different ways you can handle backpressure:

onBackPressureBuffer

Copy the following example into your project:

exampleOf("onBackPressureBuffer") {
  val disposable = Flowable.range(1, 100)
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.newThread(), false, 1)
      .doOnComplete { println("We're done!") }
      .subscribe {
        println("Integer: $it")
        safeSleep(50)
      }
  safeSleep(1000)
  disposable.dispose()
}
observeOn(Schedulers.newThread(), false, 1)
.onBackpressureBuffer(
  // 1
  50,
  // 2
  { println("Buffer overrun; dropping latest") },
  // 3
  BackpressureOverflowStrategy.DROP_LATEST
)
Integer: 1
Buffer overrun; dropping latest
Buffer overrun; dropping latest
...
Integer: 2
Integer: 3
...

onBackPressureLatest

Copy the following example into your project:

exampleOf("onBackPressureLatest") {
  val disposable = Flowable.range(1, 100)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread(), false, 1)
    .doOnComplete { println("We're done!") }
    .subscribe {
      println("Integer: $it")
      safeSleep(50)
    }
  safeSleep(1000)
  disposable.dispose()
}
.onBackpressureLatest()
--- Example of: onBackPressureLatest ---
Integer: 1
Integer: 100
We're done!

Built-in backpressure support

You’ve done a fantastic job handling backpressure in several ways. But there’s one more example to work through. It’s a quick one though!

exampleOf("No backpressure") {
  val disposable = Flowable.range(1, 100)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread(), false, 1)
    .doOnComplete { println("We're done!") }
    .subscribe {
      println("Integer: $it")
      safeSleep(50)
    }
  safeSleep(1000)
  disposable.dispose()
}
--- Example of: No backpressure ---
Integer: 1
Integer: 2
Integer: 3
...
JavaDocs for Flowable.range
QizuKath naf Tnebiyna.zikqo

JavaDocs for Flowable.zip
HuneDokk wuc Hbosuzri.fiq

Flowables, Observables, Processors and Subjects — Oh, My!

You may be feeling a little overwhelmed since you’ve just been given a whole new reactive type — Flowables! But don’t worry, Flowables are really just like Observables, but with more control over backpressure. You can even switch between the two types seamlessly.

Choosing a BackpressureStrategy value

There’s five different BackpressureStrategy values you can pick from:

exampleOf("toFlowable") {
  val disposable = Observable.range(1, 100)
    .toFlowable(BackpressureStrategy.MISSING)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread(), false, 1)
    .subscribe {
      println("Integer: $it")
      safeSleep(50)
    }
  safeSleep(1000)
  disposable.dispose()
}
.toFlowable(BackpressureStrategy.BUFFER)

Processors

Since a Subject is just a fancy Observable, you could always use toFlowable on it to turn it into a Flowable. Just like before, you’ll have to supply a BackpressureStrategy. Alternatively, if you want a backpressure-aware version of your favorite subject, you can use the Processor type. it’s just like a Subject, except backpressure aware!

exampleOf("Processor") {
  // 1
  val processor = PublishProcessor.create<Int>()
  // 2
  val disposable = processor
    .observeOn(Schedulers.newThread(), false, 1)
    .subscribe {
      println("Integer: $it")
      safeSleep(50)
    }
  // 3
  Thread().run {
    for (i in 0..100) {
      processor.onNext(i)
      safeSleep(5)
    }
  }
  safeSleep(1000)
  disposable.dispose()
}
val disposable = processor
  .onBackpressureDrop { println("Dropping $it") }
  .observeOn(Schedulers.newThread(), false, 1)
  .subscribe {
    println("Integer: $it")
    safeSleep(50)
  }
--- Example of: Processor ---
Integer: 0
Dropping 1
Dropping 2
Dropping 3
Dropping 4
Dropping 5
Dropping 6
Dropping 7
Dropping 8
Dropping 9
Integer: 10
Dropping 11
...

Key points

  • Flowables offer a powerful tool for handling backpressure, which is when a stream is producing values faster than they can be consumed by an Observer. Most of the time you can ignore backpressure and use Observables, but Flowable can be super-handy if you need it.
  • You’d typically use a Flowable if you have really large (like over 1000 items) streams that come at variable speeds. For example, image you have a web socket that sends down tons of data at random times. You might want to only handle the latest item, so you could use the onBackpressureLatest method to achieve that.
  • If you have an Observable that emits Bitmaps (or other types which can have a really huge memory footprint), you might want to be aware of the fact that all the emitted Bitmaps will buffer if you can’t consume them fast enough, which could lead to an OutOfMemoryError. It might make sense to make use of one of the backpressure operators there as well.
  • Similarly, if you are buffering high memory items into a Subject, consider using a Processor instead. Just make sure to add the proper onBackPressure... operator to ensure you aren’t hit with a MissingBackpressureException!

Where to go from here?

Backpressure is one of things that only show up when you least expect it. Before proceeding, invest some time in playing around with the examples in this chapter and test some operators to see what impact they have on the final result. Understanding backpressure will make your life easier with RxJava, and it will improve your confidence when working with Flowables.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now