2.
Observables
Written by Alex Sullivan & Scott Gardner
Now that you’re all setup with RxJava, it’s time to jump in and start building some observables!
In this chapter, you’re going to go over a few different examples of creating and subscribing to observables. Things are going to be pretty theoretical for now, but rest assured that the skills you pick up in this chapter will come in very handy as you start working through real-world projects.
Getting started
You’ll work through these theoretical examples of observables using a normal IntelliJ IDEA project. You’ll move on to Android Studio projects once you switch to working on real-world Android applications.
Use the File ▸ Open command in IntelliJ IDEA to open the root folder of the starter project. Accept the defaults in any pop-ups that occur, and the project will then be opened. You’ll primarily be working in the main.kt file in the src/main/kotlin folder of the project. For now, there’s just an empty main()
function. You’ll fill it out as you progress through the chapter.
Before you start diving into some RxJava code, take a look at the SupportCode.kt file. It contains the following helper function exampleOf(description: String, action: () -> Unit
):
fun exampleOf(description: String, action: () -> Unit) {
println("\n--- Example of: $description ---")
action()
}
You’ll use this function to encapsulate different examples as you work your way through this chapter. You’ll see how to use this function shortly.
But, before you get too deep into that, now would probably be a good time to answer the question: What is an observable?
Observables are the heart of Rx. You’re going to spend some time discussing what observables are, how to create them and how to use them.
What is an observable?
You’ll see “observable,” “observable sequence,” and “stream” used interchangeably in Rx. And, really, they’re all the same thing. In RxJava, everything is a sequence…
…or something that works with a sequence. And an Observable
is just a sequence with special powers. One of them, in fact the most important one, is that it is asynchronous. Observables produce events, the process of which the library refers to as emitting, over a period of time. Events can contain values, such as numbers or instances of a custom type, or they can be recognized user gestures, such as taps.
One of the best ways to conceptualize this is by using marble diagrams, which are values plotted on a timeline.
The left-to-right arrow represents time, and the numbered circles represent elements of a sequence. The observable will emit element 1, some time will pass, and then it will emit 2 and 3. How much time, you ask? It could be at any point throughout the life of the observable — which brings you to the lifecycle of an observable.
Lifecycle of an observable
In the previous marble diagram, the observable emitted three elements. When an observable emits an element, it does so in what’s known as a next event.
Here’s another marble diagram, this time including a vertical bar that represents the end of the road for this observable.
This observable emits three tap events, and then it ends. This is called a complete event, as the sequence has now terminated. For example, perhaps the taps were on a view that had been dismissed. The important thing is that the observable has terminated, and it can no longer emit anything. This is normal termination.
However, sometimes things can go wrong.
An error has occurred in this marble diagram; it’s represented by the red X. The observable emitted an error event containing the error. This is no different than when an observable terminates normally with a complete event. If an observable emits an error event, it is also terminated and can no longer emit anything else.
Here’s a quick recap:
- An observable emits next events that contain elements. It can continue to do this until it either:
- …emits a complete event, which terminates it.
- …emits an error event, which terminates it.
- Once an observable is terminated, it can no longer emit events.
Now that you understand what an observable is and what it does, you’ll create some observables to see them in action.
Creating observables
Switch back from the current file to main.kt and add the code below to the main()
function. You’ll also need to include the import io.reactivex.rxjava3.core.Observable
:
exampleOf("just") {
val observable: Observable<Int> = Observable.just(1)
}
In the code above, you used the just
static method to create an observable with just one item: the Integer 1.
In Rx, methods that operate on observables are referred to as operators — so you just utilized the just
operator.
just
is aptly named, since all it does is create an observable sequence containing just the provided elements. just
can take more than one item as well — try updating the previous line to take in a few more items:
val observable = Observable.just(1,2,3)
This time, you didn’t explicitly specify the type. You might think that because you gave it several integers, the type is Observable<List<Int>>
. However, if you hover over the Observable.just(1,2,3)
expression and click View ▸ Expression Type you’ll see that the type is actually Observable<Int>
.
just
has ten overloaded methods that take a variable number of arguments, each of which are eventually emitted by the observable. If you want to create an observable of type Observable<List<Int>>
, then you can pass a List<Int>
into the just
operator. Replace the observable you previously defined with the following:
val observable = Observable.just(listOf(1))
Now, hover over the Observable.just(listOf(1))
expression and click View ▸ Expression Type again. You’ll see that the type is now Observable<List<Int>>
. That means that this new observable will emit one item — and that single item will be a list of Int
values. It can be a little tough to wrap your mind around an observable that emits lists, but with time it will become second nature.
Another operator you can use to create observables is fromIterable
. Add this code to the bottom of the main()
function:
exampleOf("fromIterable") {
val observable: Observable<Int> =
Observable.fromIterable(listOf(1, 2, 3))
}
The fromIterable
operator creates an observable of individual objects from a regular list of elements. That is, it takes all of the items in the provided list and emits those elements as if you had instead written Observable.just(1, 2, 3)
.
Hover over the Observable.fromIterable(listOf(1, 2, 3))
expression and click View ▸ Expression Type again. You’ll see that the type of this observable is Observable<Int>
rather than Observable<List<Int>>
.
fromIterable
can be handy if you have a list of objects you want to convert into an observable sequence.
The IntelliJ IDEA console is probably looking pretty bare at the moment if you’ve run this code. That’s because you haven’t printed anything except the example header. Time to change that by subscribing to observables.
Subscribing to observables
As an Android developer, you may be familiar with LocalBroadcastManager
; it broadcasts notifications to observers, which are different than RxJava Observable
s. Here’s an example of of a broadcast receiver that listens for a custom-event
Intent:
LocalBroadcastManager.getInstance(this)
.registerReceiver(object : BroadcastReceiver() {
override fun onReceive(context: Context?, intent: Intent?) {
println("We got an intent!")
}
}, IntentFilter("custom-event"))
Subscribing to an RxJava observable is similar; you call observing an observable subscribing to it. So instead of registerReceiver()
, you use subscribe()
. Unlike LocalBroadcastManager
, where developers typically use only the getInstance()
singleton instance, each observable in Rx is different.
More importantly, an observable won’t send events until it has a subscriber. Remember that an observable is really a sequence definition; subscribing to an observable is more like calling next()
on an Iterator
in the Kotlin Standard Library:
val sequence = 0 until 3
val iterator = sequence.iterator()
while (iterator.hasNext()) {
println(iterator.next())
}
/* Prints:
0
1
2
*/
Subscribing to observables is more streamlined than this, though. You can also add handlers for each event type an observable can emit. Recall that an observable emits next
, error
, and complete
events. A next
event passes the emitted element to the handler, and an error
event contains a throwable instance.
To see this in action, add this new example to the IntelliJ project (insert the code somewhere after the closing curly bracket of the previous example):
exampleOf("subscribe") {
val observable = Observable.just(1, 2, 3)
}
This is similar to the previous example, except, this time, you’re simply using the just
operator. Now add this code at the bottom of this example’s lambda, to subscribe to the observable:
observable.subscribe { println(it) }
Cmd-click on the subscribe
operator, and you’ll see that it takes a Consumer
of type Int
as a parameter. Consumer
is a simple interface that has one method, accept()
, which takes a value and returns nothing. You’ll also see that subscribe
returns a Disposable
. You’ll cover disposables shortly.
Run your main()
function. The result of this subscription is that each event emitted by the observable
prints out:
--- Example of: subscribe ---
1
2
3
Note: The console should automatically appear whenever you run the project, but you can manually show it by clicking the Run tab in the bottom left of the IntelliJ IDEA window after you run the
main()
function. You can also select View ▸ Tool Windows ▸ Run. This is where theprintln
statements display their output.
You’ve seen how to create observables of one element and of many elements. But what about an observable of zero elements? The empty
operator creates an empty observable sequence with zero elements; it will only emit a complete
event.
Add this new example to the project:
exampleOf("empty") {
val observable = Observable.empty<Unit>()
}
An observable must be defined as a specific type if it can’t be inferred. So, since empty
has nothing from which to infer the type, the type must be defined explicitly. In this case, Unit
is as good as anything else. Add this code to the example to subscribe to it, importing io.reactivex.rxjava3.kotlin.subscribeBy
to resolve the compile errors:
observable.subscribeBy(
// 1
onNext = { println(it) },
// 2
onComplete = { println("Completed") }
)
You’re using a new subscribeBy
method here instead of the subscribe
method you used previously. subscribeBy
is a handy extension method defined in the RxKotlin
library, which we’ll touch on later in the book. Unlike the subscribe
method you used previously, subscribeBy
lets you explicitly state what event you want to handle — onNext
, onComplete
, or onError
. If you were to only supply the onNext
field of subscribeBy
, you’d be recreating the subscribe
functionality you used above.
Taking each numbered comment in turn:
- Explicitly handle the next event by printing the carried value, just like before.
- A
complete
event doesn’t carry any value, so just print “Completed” instead.
Run this new example. In the console, you’ll see that empty
only emits the completed
event which makes the code print “Completed”:
--- Example of: empty ---
Completed
But what use is an empty observable? Well, they’re handy when you want to return an observable that immediately terminates or intentionally has zero values. As opposed to the empty
operator, the never
operator creates an observable that doesn’t emit anything and never terminates. It can be used to represent an infinite duration. Add this example to the project:
exampleOf("never") {
val observable = Observable.never<Any>()
observable.subscribeBy(
onNext = { println(it) },
onComplete = { println("Completed") }
)
}
Nothing is printed, except for the example header. Not even “Completed”. How do you know if this is even working? Hang on to that inquisitive spirit until the Challenges section of this chapter.
So far, you’ve been working mostly with observables of explicit variables, but it’s also possible to generate an observable from a range of values.
Add this example to the project:
exampleOf("range") {
// 1
val observable: Observable<Int> = Observable.range(1, 10)
observable.subscribe {
// 2
val n = it.toDouble()
val fibonacci = ((Math.pow(1.61803, n) -
Math.pow(0.61803, n)) /2.23606).roundToInt()
println(fibonacci)
}
}
Taking it section by section:
- Create an observable using the
range
operator, which takes astart
integer value and acount
of sequential integers to generate. - Calculate and print the nth Fibonacci number for each emitted element.
Note: The Fibonacci sequence is generated by adding each of the previous two numbers in the sequence, starting with 0 and 1: 0, 1, 1, 2, 3, 5, 8, …
There’s actually a better place than in the subscribe
method, to put code that transforms the emitted element. You’ll learn about that in Chapter 7, “Transforming Operators.”
Except for the never()
example, up to this point, you’ve been working with observables that automatically emit a completed
event and naturally terminate. This permitted you to focus on the mechanics of creating and subscribing to observables, but that swept an important aspect of subscribing to observables under the rug.
It’s time to do some housekeeping and deal with that aspect before moving on.
Disposing and terminating
Remember that an observable doesn’t do anything until it receives a subscription. It’s the subscription that triggers an observable to begin emitting events, up until it emits an error
or completed
event and is terminated. You can manually cause an observable to terminate by canceling a subscription to it.
Add this new example to the project:
exampleOf("dispose") {
// 1
val mostPopular: Observable<String> =
Observable.just("A", "B", "C")
// 2
val subscription = mostPopular.subscribe {
// 3
println(it)
}
}
Quite simply:
- Create an observable of strings.
- Subscribe to the observable, this time saving the returned
Disposable
as a local constant calledsubscription
. - Print each emitted
event
in the handler.
To explicitly cancel a subscription, call dispose()
on it. After you cancel the subscription, or dispose of it, the observable in the current example will stop emitting events.
Add this code to the bottom of the example:
subscription.dispose()
Managing each subscription individually would be tedious, so RxJava includes a CompositeDisposable
type. A CompositeDisposable
holds disposables — typically added using the add()
method — and will call dispose()
on all of them when you call dispose()
on the CompositeDisposable
itself. Add this new example to the project. You’ll need to import io.reactivex.rxjava3.disposables.CompositeDisposable
:
exampleOf("CompositeDisposable") {
// 1
val subscriptions = CompositeDisposable()
// 2
val disposable = Observable.just("A", "B", "C")
.subscribe {
// 3
println(it)
}
// 4
subscriptions.add(disposable)
// 5
subscriptions.dispose()
}
Here’s how this disposable code works:
- Create a
CompositeDisposable
. - Create an observable and disposable.
- Subscribe to the observable and print out the emitted item.
- Add the
Disposable
return value fromsubscribe
to thesubscriptions
CompositeDisposable
. - Dispose of the disposables.
This is the pattern you’ll use most frequently: creating and subscribing to an observable and immediately adding the subscription to a CompositeDisposable
.
Why bother with disposables at all? If you forget to call dispose()
on a Disposable
when you’re done with the subscription, or in some other way cause the observable to terminate at some point, you will probably leak memory.
If you forget to utilize the Disposable
returned by calling subscribe
on an Observable
, Android Studio will make it very clear that something is not right in an Android project!
Imagine leaking an huge view hierarchy just because you forgot to unsubscribe from a long running observable that you don’t even need anymore!
The create operator
In the previous examples, you’ve created observables with specific next
event elements. Another way to specify all events that an observable will emit to subscribers is by using the create
operator.
Add this new example to the project:
exampleOf("create") {
val disposables = CompositeDisposable()
Observable.create<String> { emitter ->
}
}
The create
operator takes a single parameter named source
. Its job is to provide the implementation of calling subscribe
on the observable. In other words, it defines all the events that will be emitted to subscribers. Command-click on create
to see it’s definition:
The source
parameter is an ObservableOnSubscribe<T>
. ObservableOnSubscribe
is a SAM (Single Abstract Method) interface that exposes one method — subscribe
. That subscribe
method takes in an Emitter<T>
, which has a few methods that you’ll use to build up the actual Observable
. Specifically, it has onNext
, onComplete
, and onError
methods that you can invoke.
Change the implementation of create
to the following:
Observable.create<String> { emitter ->
// 1
emitter.onNext("1")
// 2
emitter.onComplete()
// 3
emitter.onNext("?")
}
Here’s the play by play:
- Emit the string
1
via theonNext
method. - Emit a
completed
event. - Emit another string
?
via theonNext
method again.
Do you think the second onNext
element (?
) could ever be emitted to subscribers? Why or why not?
To see if you guessed correctly, subscribe to the observable by adding the following code on the next line after the create
implementation:
.subscribeBy(
onNext = { println(it) },
onComplete = { println("Completed") },
onError = { println(it) }
)
You’ve subscribed to the observable, now run the code. The result is that the first next
event element and “Completed” print out. The second next
event doesn’t print because the observable emitted a completed
event and terminated before it.
--- Example of: create ---
1
Completed
Add the following line of code between the emitter.onNext
and emitter.onComplete
calls:
emitter.onError(RuntimeException("Error"))
Run the code after you’ve made those changes. The observable emits the error and then is terminated.
--- Example of: create ---
1
Error
What would happen if you emitted neither a completed
nor an error
event? Comment out the onComplete
and onError
lines of code to find out. Here’s the complete implementation:
exampleOf("create") {
Observable.create<String> { emitter ->
// 1
emitter.onNext("1")
// emitter.onError(RuntimeException("Error"))
// 2
// emitter.onComplete()
// 3
emitter.onNext("?")
}.subscribeBy(
onNext = { println(it) },
onComplete = { println("Completed") },
onError = { println("Error") }
)
}
Run those changes. Congratulations, you’ve just leaked memory! :] The observable will never finish, and since you never disposed of the Disposable
returned by Observable.create
the sequence will never be canceled.
--- Example of: create ---
1
?
Feel free to uncomment the line adding the complete
event or dispose of the returned Disposable
if you can’t stand leaving the code in a leaky state.
Creating observable factories
Rather than creating an observable that waits around for subscribers, it’s possible to create observable factories that vend a new observable to each subscriber.
Add this new example to the project:
exampleOf("defer") {
val disposables = CompositeDisposable()
// 1
var flip = false
// 2
val factory: Observable<Int> = Observable.defer {
// 3
flip = !flip
// 4
if (flip) {
Observable.just(1, 2, 3)
} else {
Observable.just(4, 5, 6)
}
}
}
Here’s the explanation:
- Create a
Boolean
flag to flip which observable to return. - Create an observable of
Int
factory using thedefer
operator. - Invert
flip
, which will be used each timefactory
is subscribed to. - Return different observables based on whether
flip
istrue
orfalse
.
Externally, an observable factory is indistinguishable from a regular observable. Add this code to the bottom of the example to subscribe to factory
four times:
for (i in 0..3) {
disposables.add(
factory.subscribe {
println(it)
}
)
}
disposables.dispose()
Run this code. Each time you subscribe to factory
, you get the opposite observable. You get 123
, then 456
, and the pattern repeats each time a new subscription is created:
--- Example of: defer ---
1
2
3
4
5
6
1
2
3
4
5
6
Using other observable types
In addition to the normal Observable
type, there are a few other types of observables with a narrower set of behaviors than regular observables. Their use is optional; you can use a regular observable anywhere you might use one of these specialized observables. Their purpose is to provide a way to more clearly convey your intent to readers of your code or consumers of your API. The context implied by using them can help make your code more intuitive.
There are three special types of observables in RxJava: Single
, Maybe
and Completable
. Without knowing anything more about them yet, can you guess how each one is specialized?
-
Single
s will emit either asuccess(value)
orerror
event.success(value)
is actually a combination of thenext
andcompleted
events. This is useful for one-time processes that will either succeed and yield a value or fail, such as downloading data or loading it from disk. -
A
Completable
will only emit acompleted
orerror
event. It doesn’t emit any value. You could use aCompletable
when you only care that an operation completed successfully or failed, such as a file write. -
And
Maybe
is a mash-up of aSingle
andCompletable
. It can either emit asuccess(value)
,completed
, orerror
. If you need to implement an operation that could either succeed or fail, and optionally return a value on success, thenMaybe
is your ticket.
You’ll have an opportunity to work more with these special observable types in Chapter 4, “Observables & Subjects in Practice,” and beyond. For now, you’ll run through a basic example of using a Single
to load some text from a text file named Copyright.txt, because who doesn’t love some legalese once in a while? This file is in the src folder of the project.
Add this example to main()
, importing io.reactivex.rxjava3.core.Single
when you do:
exampleOf("Single") {
// 1
val subscriptions = CompositeDisposable()
// 2
fun loadText(filename: String): Single<String> {
// 3
return Single.create create@{ emitter ->
}
}
}
Here’s what you do in this code:
- Create a composite disposable to use later.
- Implement a function to load text from a file on disk that returns a
Single
. - Create and return a
Single
.
Add this code inside the create
lambda to complete the implementation:
// 1
val file = File(filename)
// 2
if (!file.exists()) {
emitter.onError(FileNotFoundException("Can’t find $filename"))
return@create
}
// 3
val contents = file.readText(Charsets.UTF_8)
// 4
emitter.onSuccess(contents)
From the top:
- Create a new
File
from the filename. - If the file doesn’t exist, emit a
FileNotFoundException
via theonError
method and return from thecreate
method. - Get the data from the file.
- Emit the contents of the file.
Now you can put this function to work. Add this code to the example:
// 1
val observer = loadText("Copyright.txt")
// 2
.subscribeBy(
// 3
onSuccess = { println(it) },
onError = { println("Error, $it") }
)
subscriptions.add(observer)
Here, you:
- Call
loadText()
, passing the root name of the text file. - Subscribe to the
Single
it returns. - Pass
onSuccess
andonError
lambdas to thesubscribeBy
method, either printing the contents of the file or printing the error.
Run the example, and you should see the text from the file printed to the console, the same as the copyright comment at the top of the project:
--- Example of: Single ---
Copyright (c) 2014-2020 Razeware LLC
...
Try changing the filename to something else, and you should get the file not found exception printed instead.
Challenges
Practice makes permanent. By completing challenges in this book, you’ll practice what you’ve learned in each chapter and pick up a few more tidbits of knowledge about working with observables. A starter project as well as a finished version are provided for each challenge. Enjoy!
Challenge: Perform side effects
In the never
operator example earlier, nothing printed out. That was before you were adding your subscriptions to composite disposables, but if you had added it to one, you could’ve used a handy operator to print a message when the disposable was disposed.
Operators that begin with doOn
, such as the doOnDispose
operator, allows you to insert side effects; that is, you add handlers that take some action but that won’t affect the observable. For doOnDispose
, that is whenever the disposable is disposed of.
There’s a few other handy doOn
methods that you can use. There’s a doOnNext
method, a doOnComplete
method, a doOnError
method and a doOnSubscribe
method that you can also use to perform some side effect at the right moment.
To complete this challenge, insert the doOnSubscribe
operator in the never
example. Feel free to include any of the other handlers if you’d like; they work just like doOnSubscribe
’s handler does.
And while you’re at it, create a composite disposable and add the subscription to it.
Don’t forget you can always peek into the finished challenge project for “inspiration.”
Key points
- Everything is a sequence in RxJava, and the primary sequence type is
Observable
. - Observables start emitting when they are subscribed to.
- You must dispose of subscriptions when done with them, and you’ll often use a
CompositeDisposable
to do so. -
Single
,Completable
andMaybe
are specialized observable types that are handy in certain situations.