Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

13. Intro to Schedulers
Written by Alex Sullivan & Junior Bontognali

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Until now, you’ve managed to work with schedulers, while avoiding any explanation about how they handle threading or concurrency. In earlier chapters, you used methods, which implicitly used some sort of concurrency/threading level, such as the buffer or interval operators.

You probably have a feeling that schedulers have some sort of magic under the hood, but before you understand schedulers, you’ll also need to understand what those observeOn and subscribeOn functions are all about.

This chapter is going to cover the beauty behind schedulers. You’ll learn why the Rx abstraction is so powerful and why working with asynchronous programming is far less painful than using AsyncTasks, IntentHandlers and the myriad of other asynchronous tools Android development offers.

Note: Creating custom schedulers is beyond of the scope of this book. Keep in mind that the schedulers and initializers provided by RxJava generally cover 99% of cases. Always try to use the built-in schedulers.

What is a scheduler?

Before getting your hands dirty with schedulers, it’s important to understand what they are — and what they are not. To summarize, a scheduler is an abstraction introduced by the RxJava library to schedule work at some point in time. The work happens in some asynchronous context. That context could be custom Threads, an event loop, Executors and so on.

While the Scheduler abstract class is a powerful abstraction over different ways of executing asynchronous code, for Android apps you can usually think of schedulers in relation to threads and thread pools. You’ll learn more about the different types of schedulers and how they allow you to switch between threading contexts later on.

Here’s a good example as to how schedulers can be used:

In this diagram, you have the concept of a cache operator. An Observable makes a request to a server and retrieves some data. This data is processed by a custom operator named cache, which stores the data somewhere. After this, the data is passed to all subscribers in a different scheduler, most likely the main scheduler, which sits on top of the Android main thread. Remember that anytime you update a UI element in an Android app it must be done on the main thread.

Setting up the project

Time to write some code! In this project, you are going to work with an Android app called Schedulers that has a profoundly beautiful user interface. That user interface is one TextView in the center of a white screen.

0s | [D] [dog] received on Thread: main
0s | [S] [dog] received on Thread: main

Switching schedulers

One of the most important things in Rx is the ability to switch schedulers at any time, without any restrictions except for ones imposed by the inner process generating events.

val fruit = Observable.create<String> { observer ->
  observer.onNext("[apple]")
  Thread.sleep(2000)
  observer.onNext("[pineapple]")
  Thread.sleep(2000)
  observer.onNext("[strawberry]")
}
fruit
  .dump()
  .dumpingSubscription()
  .addTo(disposables)
0s | [D] [dog] received on Thread: main
0s | [S] [dog] received on Thread: main
0s | [D] [apple] received on Thread: main
0s | [S] [apple] received on Thread: main
2s | [D] [pineapple] received on Thread: main
2s | [S] [pineapple] received on Thread: main
4s | [D] [strawberry] received on Thread: main
4s | [S] [strawberry] received on Thread: main

Using subscribeOn

In some cases, you might want to change on which scheduler the Observable computation code runs — not the code in any of the subscription operators, but the code that is actually emitting the Observable events.

fruit
  .subscribeOn(Schedulers.io())
  .dump()
  .dumpingSubscription()
  .addTo(disposables)
0s | [D] [dog] received on Thread: main
0s | [S] [dog] received on Thread: main
0s | [D] [apple] received on Thread: RxCachedThreadScheduler-1
0s | [S] [apple] received on Thread: RxCachedThreadScheduler-1
2s | [D] [pineapple] received on Thread: RxCachedThreadScheduler-1
2s | [S] [pineapple] received on Thread: RxCachedThreadScheduler-1
4s | [D] [strawberry] received on Thread: RxCachedThreadScheduler-1
4s | [S] [strawberry] received on Thread: RxCachedThreadScheduler-1

Using observeOn

Observing is one of the three fundamental concepts of Rx. It involves an entity producing events and an observer for those events. In this case, and in opposition to subscribeOn, the operator observeOn changes the scheduler where the observation happens.

implementation "io.reactivex.rxjava3:rxandroid:3.0.0"
fruit
  .subscribeOn(Schedulers.io())
  .dump()
  .observeOn(AndroidSchedulers.mainThread())
  .dumpingSubscription()
  .addTo(disposables)
0s | [D] [dog] received on Thread: main
0s | [S] [dog] received on Thread: main
0s | [D] [apple] received on Thread: RxCachedThreadScheduler-1
0s | [S] [apple] received on Thread: main
2s | [D] [pineapple] received on Thread: RxCachedThreadScheduler-1
2s | [S] [pineapple] received on Thread: main
4s | [D] [strawberry] received on Thread: RxCachedThreadScheduler-1
4s | [S] [strawberry] received on Thread: main

Pitfalls

The ability to switch schedulers and threads looks amazing, but it comes with some pitfalls. To see why, you’ll push some events to the subject using a new thread. Since you need to track on which thread the computation takes place, a good solution is to use Thread.

val animalsThread = Thread {
  Thread.sleep(3000)
  animal.onNext("[cat]")
  Thread.sleep(3000)
  animal.onNext("[tiger]")
  Thread.sleep(3000)
  animal.onNext("[fox]")
  Thread.sleep(3000)
  animal.onNext("[leopard]")
}
animalsThread.name = "Animals Thread"
animalsThread.start()
...
3s | [D] [cat] received on Thread: Animals Thread
3s | [S] [cat] received on Thread: Animals Thread
4s | [D] [strawberry] received on Thread: RxCachedThreadScheduler-1
4s | [S] [strawberry] received on Thread: main
6s | [D] [tiger] received on Thread: Animals Thread
6s | [S] [tiger] received on Thread: Animals Thread
9s | [D] [fox] received on Thread: Animals Thread
9s | [S] [fox] received on Thread: Animals Thread
12s | [D] [leopard] received on Thread: Animals Thread
12s | [S] [leopard] received on Thread: Animals Thread
animal
  .dump()
  .observeOn(Schedulers.io())
  .dumpingSubscription()
  .addTo(disposables)
...
3s | [D] [cat] received on Thread: Animals Thread
3s | [S] [cat] received on Thread: RxCachedThreadScheduler-1
4s | [D] [strawberry] received on Thread: RxCachedThreadScheduler-2
4s | [S] [strawberry] received on Thread: main
6s | [D] [tiger] received on Thread: Animals Thread
6s | [S] [tiger] received on Thread: RxCachedThreadScheduler-1
9s | [D] [fox] received on Thread: Animals Thread
9s | [S] [fox] received on Thread: RxCachedThreadScheduler-1
12s | [D] [leopard] received on Thread: Animals Thread
12s | [S] [leopard] received on Thread: RxCachedThreadScheduler-1
animal
  .subscribeOn(AndroidSchedulers.mainThread())
  .dump()
  .observeOn(Schedulers.io())
  .dumpingSubscription()
  .addTo(disposables)
3s | [D] [cat] received on Thread: Animals Thread
3s | [S] [cat] received on Thread: RxCachedThreadScheduler-2
4s | [D] [strawberry] received on Thread: RxCachedThreadScheduler-1
4s | [S] [strawberry] received on Thread: main
6s | [D] [tiger] received on Thread: Animals Thread
6s | [S] [tiger] received on Thread: RxCachedThreadScheduler-2
9s | [D] [fox] received on Thread: Animals Thread
9s | [S] [fox] received on Thread: RxCachedThreadScheduler-2
12s | [D] [leopard] received on Thread: Animals Thread
12s | [S] [leopard] received on Thread: RxCachedThreadScheduler-2

Hot vs. cold

The section above touched on the topic of hot and cold Observables. The topic of hot and cold Observables is quite opinionated and generates a lot of debate, so let’s briefly look into it, here. The concept can be reduced to a very simple question:

Best practices and built-in schedulers

Schedulers are a non-trivial topic, so they come with some best practices for the most common use cases. In this section, you’ll get a quick introduction to serial and concurrent schedulers, learn how they process the data and see which type works better for a particular context.

Android main scheduler

AndroidSchedulers.mainThread() sits on top of the main thread. This scheduler is used to process changes on the user interface and perform other high-priority tasks.

io scheduler

The scheduler returned by Schedulers.io() should be used whenever you’re doing work that’s IO bound. Specifically, if you’re making any network calls, accessing items from a database, or reading lines from a file, this is the scheduler for you.

Computation scheduler

If you do need to heavy computational work, like crunching large data sets or handling event loops, you can use the scheduler returned by Schedulers.computation().

Single threaded scheduler

Sometimes, you need to work off the main thread but you also need guarantees that the work you’re doing is happening sequentially. This isn’t a problem if you’re only working in the confines of one RxJava chain, since, for the most part, those chains will always happen sequentially.

Trampoline scheduler

Similar to the single scheduler, the scheduler returned by Schedulers.trampoline() always operates on a single thread. Unlike the single scheduler, that thread isn’t a background thread. Instead, it’s the main thread that created the trampoline scheduler. You’ll see in Chapter 15, “Testing RxJava Code,” that the trampoline scheduler can be very useful while writing unit tests.

Test scheduler

TestScheduler is a special kind of beast. It’s meant only to be used in testing, so make sure not to use this scheduler in production code. This special scheduler simplifies operator testing. You will have a look into using this scheduler in the dedicated chapter about testing, but let’s have a quick look since you’re doing the grand tour of schedulers.

val scheduler = TestScheduler()
val observable = Observable.timer(2, TimeUnit.SECONDS, scheduler)
*  <dd>{@code timer} operates by default on the {@code computation} {@link Scheduler}.</dd>
val testTimer = observable.test()

testTimer.assertNotComplete()

scheduler.advanceTimeBy(2, TimeUnit.SECONDS)

testTimer.assertComplete()

Key points

  • A Scheduler is an abstract context upon which RxJava executes work. In other words, Schedulers let you choose to do work on different threads.
  • You can use the subscribeOn operator to control on what thread your Observable is created. That allows you to, for example, execute the actual networking portion of an API call off the main thread.
  • After using subscribeOn, you can use the observeOn operator to then choose a different thread to actually receive the emitted objects on. You’ll often use this operator to switch back to the main thread to update UI objects.
  • While subscribeOn and observeOn are extremely powerful operators, they’re not magic. If you call the onNext method of a subject on a different thread, RxJava can’t honor your subscribeOn call and you’ll see the item emitted on the original thread.
  • There are both hot Observables and cold Observables. cold Observables create some special side effect when they’re subscribed to. A network call that returns an Oobservable is an example of a cold stream. A hot Observable is always running and emitting items, even if no one is listening. Subscribing to a hot Observable will not cause any special side effects.
  • There are several built in schedulers for you to use. The io scheduler is great for network and database calls, while the computation scheduler is good for event loops and computationally expensive code.
  • The RxAndroid library exposes another special scheduler you can use to emit items on the Android main thread.
  • Finally, the TestScheduler class assists in testing RxJava code and should not be used in production code.

Where to go from here?

Schedulers are a non-trivial topic in the Rx space; they’re responsible for computing and performing all tasks in RxJava.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now