An Introduction to Flow for Android
Flow is a new feature of Kotlin Coroutines. Whereas suspending functions in Kotlin can only return a single value; Flow can return multiple values.
You can think of Flow
as a stream. Values flow downstream like water in a river. Values can be created from asynchronous actions such as network requests or database calls. If you are familiar with RxJava
- Flow
behaves similarly to Observable
and Flowable
. Flow
is built on top of coroutines. We love coroutines because they bring structured concurrency to Kotlin.
In this article we will look into the basics of Flow
and in which scenarios we can use it.
Example 1:
val streamOfInts = flow {
println("Start emitting")
for (i in 0..1) {
delay(100)
println("Sending $i")
emit(i)
}
println("Finished emitting")
}
streamOfInts.collect {
println("Receiving $it")
}
println("Flow completed")
This prints:
Start emitting
Sending 0
Receiving 0
Sending 1
Receiving 1
Finished emitting
Flow completed
The first thing we see in the code above is the flow builder. Inside the flow builder you can do any asynchronous work and use emit
to send some kind of value. A 100 millisecond delay is used to simulate a network request before emitting a value. delay
is a handy suspend function which delays for a specified time before resuming where it left off. Two values are emitted before the flow completes. collect
is also a suspend function and it is used to start collecting values from the flow. Flows are cold streams so the flow will not start emitting before collect
is called. collect
is similar to subscribe
in RxJava
. The flow builder is the producer and the collector is the consumer.
Operators
Flow
can be used in a reactive programming style. It has a lot of different operators that can transform a flow with functional operators. Operators such as map
, filter
, reduce
and many more. Flow
supports suspending functions on most operators which means that you can execute sequential asynchronous tasks in operators like map
.
Let’s get some data from our database and then from the network:
Example 2:
val stream = flow {
println("Computed on $thread")
val dataFromDatabase = database.getData()
emit(dataFromDatabase)
val dataFromNetwork = network.getData()
emit(dataFromNetwork)
}
stream
.map { it.plus(" mapped on $thread") }
.flowOn(Dispatchers.IO)
.collect { println(it.plus(" collected on $thread")) }
This prints:
Computed on worker-1
Database data! mapped on worker-1 collected on main
Network data! mapped on worker-1 collected on main
The code above will first get data from the database and emit it to the collector. It then fetches updated data from the network and emits it. For simplicity we skip the caching of network data to the database.
The view can listen to this stream of data and update the UI when new data is collected. We have used the map
operator to transform which thread the code is running in. You can see that we introduced a new operator named flowOn
which will switch the upstream context in the flow. If you want to execute an operation off the main thread you can set which kind of dispatcher you want to use (Main
, Default
, IO
, Unconfined
). This is similar to subscribeOn
in RxJava
.
You can find all operators here
Custom Operators
Creating your own custom operators is easily achieved. Here is a custom operator that transforms all strings to uppercase.
Example 3:
fun Flow<String>.uppercase(): Flow<String> {
return flow {
collect { emit(it.toUpperCase()) }
}
}
flowOf("No", "soup", "for", "you!")
.uppercase()
.collect { println(it) }
This prints:
NO
SOUP
FOR
YOU!
Data flow in Android
In Android we have to think about how technology works in the context of Fragment
, Activity
and ViewModel
. There are multiple ways of doing this. Personally I use Flow
in repositories and LiveData
in ViewModel
which I will come back to later. Here’s an example of how this might look.
Example 4:
class WeatherRepository {
fun getWeather(): Flow<Weather> {
return flow {
emit(Weather("Rainy :("))
emit(Weather("Cloudy :|"))
emit(Weather("Sunny :)"))
}
}
}
class WeatherViewModel(
private val weatherRepository: WeatherRepository
) : ViewModel() {
val weather = MutableLiveData<Weather>()
init { getWeather() }
private fun getWeather() {
viewModelScope.launch {
weatherRepository.getWeather()
.flowOn(Dispatchers.IO)
.collect { weather.value = it }
}
}
}
class WeatherFragment : Fragment() {
private fun initObservers() {
viewModel.weather.observe(this, Observer { println(it) })
}
}
This will print:
Rainy :(
Cloudy :|
Sunny :)
WeatherViewModel
exposes MutableLiveData<Weather>
to WeatherFragment
. In the init block we start to collect the weather flow. In the fragment we start observing the weather LiveData
. If the orientation changes and the fragment is recreated the console will print description=Sunny :)
because LiveData
caches the last item.
If we try and rewrite the same code without LiveData
there are some consequences.
class WeatherViewModel(
private val weatherRepository: WeatherRepository
) : ViewModel() {
val weatherFlow = weatherRepository.getWeather().flowOn(Dispatchers.IO)
}
class WeatherFragment : Fragment() {
private fun initFlows() {
lifecycleScope.launchWhenStarted {
viewModel.weatherFlow.collect { println(it) }
}
}
}
Instead of using LiveData
to communicate with WeatherFragment
we are using Flow
. The code mostly works in the same way. If the orientation changes however, WeatherFragment
will be recreated and attempt to collect the weather flow. This will cause the flow to emit all its values again. For this reason I like to use LiveData
.
Jetbrains are currently working on something called DataFlow
which will have similar behaviour as LiveData
. You can find more info about DataFlow
here
Testing
For testing flows you can use the take
operator to get a fixed amount of items from the flow and toList
as the terminal operator to get the result as a list. toList
suspends until it receives the number of items set in the take
operator or it waits until the flow ends. You can still use collect
instead of toList
but personally I prefer using toList
because you end up with all elements so you have more control of the result.
Example 5:
val ints = flowOf(0, 1, 2)
@Test
fun `confirm 3 ints from flow`() = runBlockingTest {
val result = ints.toList()
assertEquals(result.size, 3)
assertEquals(result[0], 0)
assertEquals(result[1], 1)
assertEquals(result[2], 2)
}
The flow should emit three values in total. In the test above we wait for the flow to emit all values and then confirm the length and the values received. Be careful when using only the toList
operator. If the flow emits an infinite amount of values the function would suspend forever. To prevent this from happening we have the take
operator to set the number of values that we want to test.
@Test
fun `confirm 2 ints from flow`() = runBlockingTest {
val result = ints.take(2).toList()
assertEquals(result.size, 2)
assertEquals(result[0], 0)
assertEquals(result[1], 1)
}
Perfect!
Error handling
If you do not handle errors in your flow your app will crash like any other suspend function. You can use the traditional way of wrapping the flow with a try catch
or you can use the catch
operator to catch exceptions thrown upstream. One thing to note is that it doesn’t handle exceptions downstream. If the collect
block throws an exception, your app will crash. To get around this we can use collect
and onEach
together to collect all upstream exceptions.
Example 6:
scope.launch {
dataFlow
.onEach { /* Handle emitted value */ }
.catch { logError(it) /* Handle upstream exceptions */ }
.collect()
}
If an error is thrown the flow will still stop emitting new values. If it should continue after an exception you need to handle this case manually.
Conclusion
This was a short intro to Flow
. The API is lightweight and makes it hard to leak subscriptions, which is a problem in RxJava
. We get this for free with structured concurrency. No more silent resource leaks! Room
and SqlDelight
already have support for Flow
making it easy to get started. If you want to read more about Flow
you can find more resources below.
References:
https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9 https://medium.com/@elizarov/exceptions-in-kotlin-flows-b59643c940fb https://medium.com/@elizarov/execution-context-of-kotlin-flows-b8c151c9309b https://medium.com/@elizarov/simple-design-of-kotlin-flow-4725e7398c4c https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/index.html