Reactive Streams on Kotlin: SharedFlow and StateFlow
In this tutorial, you’ll learn about reactive streams in Kotlin and build an app using two types of streams: SharedFlow and StateFlow. By Ricardo Costeira.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Reactive Streams on Kotlin: SharedFlow and StateFlow
30 mins
- Getting Started
- SharedFlow
- Handling Shared Events
- Event Emission With SharedFlow
- Replay and Buffering
- Subscribing to Event Emissions
- Collecting the SharedFlow
- Applying the Stream Data to the View
- SharedFlow and Channels
- StateFlow
- Handling App State
- Event Emission With StateFlow
- Subscribing to State Updates
- StateFlow and Channels
- Hot Flows, RxJava and LiveData
- Challenge: Using SharedFlow To Handle Screen Events
- Where to Go From Here?
Applying the Stream Data to the View
Back in the code, you can see notifyOfPriceVariation()
doesn’t exist yet. Add it as well:
private fun notifyOfPriceVariation(variation: Int) {
val message = getString(R.string.price_variation_message, variation)
showSnackbar(message)
}
Easy-peasy. Build and run the app. Now, when you go to the coin history screen, you’ll see some periodical Snackbar
messages at the bottom. The shared flow will only start emitting when you go to that screen, though. Even if the CoinsSharedViewModel
instance is bound to the Activity
, it’s only created when you first visit the coin history screen.
You want all screens to be aware of price changes, so this isn’t ideal. To fix it, do the exact same changes in CoinListFragment
:
- Create the
CoinsSharedViewModel
instance in the same way. - Add the code to
subscribeToSharedViewEffects()
. - Create
notifyOfPriceVariation()
.
Build and run the app. You’ll now see the periodical Snackbar
messages in CoinListFragment
as well. As you switch screens, you’ll see that the messages always show the next event and not the previous ones. MutableSharedFlow()
in CoinsSharedViewModel
is using the default parameters. But feel free to play around with it to see how it affects the shared flow!
SharedFlow and Channels
Like shared flows, channels represent hot streams. But this doesn’t mean shared flow will replace the channels API — not entirely, at least. :]
SharedFlow
is designed to completely replace BroadcastChannel
. Not only is SharedFlow
simpler and faster to use, but it’s a lot more versatile than BroadcastChannel
. Keep in mind, though, that other elements from the channels API can and should still be used when it makes sense to do so.
StateFlow
A state flow is structured like a shared flow. This is because StateFlow
is nothing more than a specialization of SharedFlow
. In fact, you can create a shared flow that behaves exactly like a state flow:
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
The code above creates a shared flow that emits the latest value only to any new subscribers. Due to that distinctUntilChanged
at the bottom, it’ll only emit any value if it’s different from the previous one. This is exactly what a state flow does, which makes it great for holding and handling state.
Handling App State
There are simpler ways of creating state flows though, which you’ll use now. Expand the coinlist package and, inside, open CoinListFragmentViewModel.kt. This simple ViewModel
uses LiveData
to expose a view state class to CoinListFragment
. The state class itself is also fairly simple, and it has default values to match the initial view state:
data class CoinListFragmentViewState(
val loading: Boolean = true,
val coins: List<UiCoin> = emptyList()
)
The Fragment
then uses the current state to update the view by observing the LiveData
:
// Code in CoinListFragment.kt
private fun observeViewStateUpdates(adapter: CoinAdapter) {
viewModel.viewState.observe(viewLifecycleOwner) { updateUi(it, adapter) }
}
Start the refactoring by changing MutableLiveData
to a MutableStateFlow
. So in CoinListFragmentViewModel
, go from:
private val _viewState = MutableLiveData(CoinListFragmentViewState())
To:
private val _viewState = MutableStateFlow(CoinListFragmentViewState())
Make sure to include the necessary import for MutableStateFlow
. This is how you create a mutable state flow. Unlike shared flows, state flows require an initial value or, in other words, an initial state. But because state flow is a specific implementation of shared flow, there’s no way for you to customize things like replay
or extraBufferCapacity
. Regardless, the generic rules and constraints for shared flows still apply.
Next, update the immutable LiveData
accordingly, from:
val viewState: LiveData<CoinListFragmentViewState> get() = _viewState
To:
val viewState: StateFlow<CoinListFragmentViewState> get() = _viewState
Of course, you could also do:
val viewState = _viewState.asStateFlow()
Add the import for StateFlow
. Be it a shared flow or a state flow, you can create an immutable one with both options. The advantage of using asStateFlow()
or asSharedFlow()
is that you get the extra safety of explicitly creating an immutable version of the flow. This avoids things like creating another mutable version by mistake.
Event Emission With StateFlow
A difference worth noting between shared and state flows is event emission. You can still use emit
and tryEmit
with state flow, but … don’t. :]
Instead, you should do:
mutableState.value = newState
The reason is that updates to value
are always conflated, which means that even if you update it faster than subscribers can consume it, they’ll get the most recent value only. One thing to keep in mind is that whatever you assign to value
has to be a completely different object from whatever was there before. For instance, take this code:
data class State(
var name: String = "",
var age: Int = -1
)
val mutableState = MutableStateFlow<State>(State())
// ...
// newState and mutableState.value will reference the same object
val newState = mutableState.value
// Reference is the same, so this is also changing mutableState.value!
newState.name = "Marc"
mutableState.value = newState
In this case, the state flow won’t emit the new value. Because the referenced object is the same, the equality comparison will return true, so the flow will assume it’s the same state.
To make this work, you need to use immutable
objects. For example:
data class State(
val name: String = "",
val age: Int = -1
)
val mutableState = MutableStateFlow<State>(State())
// ...
mutableState.value = State(name = "Marc")
This way, the state flow will properly emit a state update. Immutability saves the day once again. :]
Back at the code, the cool thing about replacing LiveData
with StateFlow
is that both of them use a property called value
, so nothing changes there.
There’s one last change to make in CoinListFragmentViewModel
, inside the requestCoinList()
method. You can now update that if
condition at the beginning to:
if (viewState.value.coins.isNotEmpty()) return
You don’t need the ?
anymore, because value
won’t be null
. Also, you invert the condition by using isNotEmpty()
instead of isNullOrEmpty()
and by dropping !
at the beginning. This makes the code a little more readable.
If you try to build the app, you get an error on CoinListFragment
stating that there’s an unresolved reference to observe
. StateFlow
doesn’t have an observe
method, so you need to refactor that as well.