Working with RxJava Disposables in Kotlin
In this tutorial, you’ll learn the basics of RxJava Disposables. You will be building an app from scratch that retrieves real-time cryptocurrency data. By Lawrence Tan 🇸🇬.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Working with RxJava Disposables in Kotlin
20 mins
Calling CryptoCompare API
Start by adding the API key you received from CryptoCompare in CryptoDataAPI.kt
:
const val APIKEY = "APIKEY" // TODO 1: Add Your Register API Key Here
...
According to the documentation here, you’ll use Multiple Symbols Price to populate live data. Update the getCryptoData
function as follows:
interface CryptoDataAPI {
...
//TODO 2: Declare the function to return an Observable
fun getCryptoData(@Query("tsyms") currencies: String): Observable<LinkedTreeMap<Object, Object>>
}
Here, the data you receive is an Observable
of type LinkedTreeMap. If you run the app now, nothing will happen.
Be patient! You’re wiring much of the internal logic. Exciting things are coming ahead!
Now, head over to CryptoDataRepository.kt
and create a function here to make the network call.
//TODO 3: Create a function to call API and return an Observable
fun getCryptoData(currencies: String): Observable<LinkedTreeMap<Object, Object>> {
return cryptoDataAPI.getCryptoData(currencies)
.doOnNext {
Log.d("getCryptoData", "Dispatching ${it.size} crypto data from API...")
}
}
This helper class uses doOnNext()
to log the data size and handle any related logic. Then it passes it to the next function.
Remember the ViewModel
? Currently, you have routed the crypto data from Server -> API Call -> Data Repository. Now, you’re going to punch this data into the ViewModels
, making them ready for display.
Now, create a function again here:
//TODO 4: Implement Function to Pass Data to ViewModel
fun getCryptoData(currencies: String): Observable<List<CryptoData>> {
return cryptoDataRepository.getCryptoData(currencies)
// 1
.map {
handleResult(it)
}
// 2
.onErrorReturn {
Log.d("getCryptoData", "An error occurred")
arrayListOf<CryptoData>().toList()
}
}
In this code, you:
1. This function calls another function that is already implemented for you in handleResult()
. It takes in server data it
, then unwraps it into CryptoData models and returns them as a list. In other words, here you’ve converted the original LinkedTreeMap
into an ArrayList
.
2. Do your error handling in this handler. For simplicity, you log a message and return an empty list of data.
Phew, that’s a lot of code for such a simple app! You’re doing well and getting near the Magic Hat!
Now, build and run. You’re not calling the functions you created here yet. That’s what you’ll do next.
Polling With Observables
Now, head over to the final file you’ll work on, the BaseFragment.kt
. First, declare two constant values at the top of the file:
//TODO 5: Constant Values for Initial Delay and Interval
const val INITIAL_DELAY_IN_MILLISECONDS: Long = 1000
const val INTERVAL_IN_MILLISECONDS: Long = 10000
The first constant sets the initial delay before making the first network call. The second constant sets the interval between each network call.
Next, locate the function call loadData()
. Update it to get the network calls to work:
private fun loadData() {
//TODO 6: Call API using Observable
Log.d("loadData", "Downloading Data ...")
// 1
val disposable = Observable.interval(INITIAL_DELAY_IN_MILLISECONDS, INTERVAL_IN_MILLISECONDS,
TimeUnit.MILLISECONDS)
// 2
.observeOn(AndroidSchedulers.mainThread())
// 3
.subscribe(this::updateCryptoData, this::onError)
//TODO 12: Add Disposables
}
What this code does is:
1. You use an Observable.interval
here along with the constants to create the polling logic. INITIAL_DELAY_IN_MILLISECONDS
is the amount of time taken to start the first emission. INTERVAL_IN_MILLISECONDS
is the breathing time before next emission. TimeUnit.MILLISECONDS
sets the unit of the time values.
2. Always observe on AndroidSchedulers.mainThread()
because that is where UI and user interactions reside.
3. Finally, here you used the double colon (::) operator to trigger a method reference to updateCryptoData
and onError
. It will initiate the polling subscription via subscribe
method. This function empowers the app now to be able to call updateCryptoData
at an interval infinitely.
You’ll realise that some functions are not yet available. Don’t worry, just keep following :].
Now, let’s look at updateCryptoData
and implement it:
//TODO 7: Add Update Crypto Data
private fun updateCryptoData(aLong: Long) {
// 1
mSwipeRefreshLayout.isRefreshing = true
// 2
val observable: Observable<List<CryptoData>> = viewModel.getCryptoData(currencies)
// 3
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.d("updateCryptoData", "Received UIModel $it users.")
handleResponse(it)
}, {
handleError(it)
})
}
In the snippet above:
1. Firstly, mSwipeRefreshLayout.isRefreshing = true
sets the state of the layout to start refreshing.
2. You declared a new observable
which is a list of observables with the crypto data retrieved from the API.
3. You subscribed to our observable on a pre-created idle thread using Schedulers.io()
. Next, observe on the main thread so that UI can be updated promptly. Then, for each emission, you passed it to handleRespones
to perform the final parsing. Finally, use handleError
to handle any errors.
//TODO 8: Add onError
private fun onError(throwable: Throwable) {
Log.d("onError", "OnError in Observable Time: $throwable")
}
//TODO 9: Handle API Response & Error
private fun handleResponse(cryptoDataList: List<CryptoData>) {
// 1
cryptoDataAdapter = CryptoDataAdapter(ArrayList(cryptoDataList), this)
cryptocurrencyList.adapter = cryptoDataAdapter
// 2
mSwipeRefreshLayout.isRefreshing = false
//TODO 12: Add Disposables
}
// 3
private fun handleError(t: Throwable) {
Log.d("handlleError", "Error: $t")
}
In this code, you:
1. Remember the function you created in the ViewModel
? It returns an ArrayList
of data models.
2. Then, set mSwipeRefreshLayout.isRefreshing
to false
to end refreshing state.
3. Finally, you log our errors if any.
Next, add these codes to navigate to each currency’s web contents in onItemClick
:
override fun onItemClick(retroCrypto: CryptoData) {
//TODO 10: Handle Item Click
val intent = Intent(activity, DetailActivity::class.java)
intent.putExtra("CryptoName", retroCrypto.name)
startActivity(intent)
}
Here, you created a new basic DetailActivity
Intent with the name of the Cryptocurrency, so that the app can open the corresponding webpage.
Now, build and run. Hurray! Everything is working fine now. The app is actually polling every ten seconds to get live data.
Try now hitting the home button. Oops! If you look at the console log, it seems like the app is still firing although the app is in the background.
Go back into the app and try to tap on a currency to view its web content. The polling doesn’t stop. You need to stop this or the app will start consuming data, depleting the battery and, worse of all, a user might uninstall this app!
Disposables to the Rescue!
Declare a CompositeDisposable
in BaseFragment and start collecting resources you want to manage:
//TODO 11: Declare Disposables
private val disposables = CompositeDisposable()
Here you created a disposables-collector to keep track of the Observable
used in this app. The app has two fragments. Each fragment creates an
Observable
and two Disposable
s. Time to manage them!
First, in loadData()
, update the implementation as such:
private fun loadData() {
//TODO 6: Call API using Observable
Log.d("loadData", "Downloading Data ...")
// 1
val disposable = Observable.interval(INITIAL_DELAY_IN_MILLISECONDS, INTERVAL_IN_MILLISECONDS,
TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateCryptoData, this::onError)
//TODO 12: Add Disposables
Log.d("loadData", "Disposable added!")
// 2
disposables.add(disposable)
}
In those lines:
1. Everything there remains the same.
2. When you call subscribe()
, a Disposable
is returned. You'll use logging to keep track of the number of Disposable
s that are added. You will also add the first disposable to our CompositeDisposable variable.
Now you need to track the number of Disposable
s in handleResponse()
:
private fun handleResponse(cryptoDataList: List<CryptoData>) {
// 1
cryptoDataAdapter = CryptoDataAdapter(ArrayList(cryptoDataList), this)
cryptocurrencyList.adapter = cryptoDataAdapter
mSwipeRefreshLayout.isRefreshing = false
// 2
Log.d("handleResponse", "We have ${disposables.size()} disposables")
...
}
The above code performs the following:
1. Everything here remains the same.
2. By calling size()
, you get the number of Disposable
s and log it here.
Remember that Disposable
s are meant to be disposed of at correct places. You don't want the app to poll when it's no longer in use or when the user navigates out of the page. So, add them to these places:
override fun onPause() {
super.onPause()
//TODO 13: Clear Disposables
disposables.clear()
Log.d("onPause", "Clear Disposables")
}
override fun onStop() {
super.onStop()
//TODO 14: Clear Disposables
disposables.clear()
Log.d("onStop", "Clear Disposables")
}
Finally, build and run. Now, when you navigate to the web contents or leave the app, you should see Clear Disposables and the polling should stop. When you return to the app, the Disposable
s are added again and the polling resumes.
You've mastered working with Disposable
s. :)