An Introduction to Flow for Android | Martin Tøften
Flow is a new feature of Kotlin Coroutines. Whereas suspending functions in Kotlin can only return a single value; Flow can return multiple values.
You can think of Flow
as a stream. Values flow downstream like water in a
river. Values can be created from asynchronous actions such as network requests
or database calls. If you are familiar with RxJava
- Flow
behaves similarly
to Observable
and Flowable
. Flow
is built on top of coroutines. We love
coroutines because they bring structured concurrency to Kotlin.
In this article we will look into the basics of Flow
and in which scenarios we
can use it.
Example 1:
val streamOfInts = flow {
println("Start emitting")
for (i in 0..1) {
delay(100)
println("Sending $i")
emit(i)
}
println("Finished emitting")
}
streamOfInts.collect {
println("Receiving $it")
}
println("Flow completed")
This prints:
Start emitting
Sending 0
Receiving 0
Sending 1
Receiving 1
Finished emitting
Flow completed
The first thing we see in the code above is the flow builder. Inside the flow
builder you can do any asynchronous work and use emit
to send some kind of
value. A 100 millisecond delay is used to simulate a network request before
emitting a value. delay
is a handy suspend function which delays for a
specified time before resuming where it left off. Two values are emitted before
the flow completes. collect
is also a suspend function and it is used to start
collecting values from the flow. Flows are cold streams so the flow will not
start emitting before collect
is called. collect
is similar to subscribe
in RxJava
. The flow builder is the producer and the collector is the consumer.
Operators
Flow
can be used in a reactive programming style. It has a lot of different
operators that can transform a flow with functional operators. Operators such as
map
, filter
, reduce
and many more. Flow
supports suspending functions on
most operators which means that you can execute sequential asynchronous tasks in
operators like map
.
Let's get some data from our database and then from the network:
Example 2:
val stream = flow {
println("Computed on $thread")
val dataFromDatabase = database.getData()
emit(dataFromDatabase)
val dataFromNetwork = network.getData()
emit(dataFromNetwork)
}
stream
.map { it.plus(" mapped on $thread") }
.flowOn(Dispatchers.IO)
.collect { println(it.plus(" collected on $thread")) }
This prints:
Computed on worker-1
Database data! mapped on worker-1 collected on main
Network data! mapped on worker-1 collected on main
The code above will first get data from the database and emit it to the collector. It then fetches updated data from the network and emits it. For simplicity we skip the caching of network data to the database.
The view can listen to this stream of data and update the UI when new data is
collected. We have used the map
operator to transform which thread the code is
running in. You can see that we introduced a new operator named flowOn
which
will switch the upstream context in the flow. If you want to execute an
operation off the main thread you can set which kind of dispatcher you want to
use (Main
, Default
, IO
, Unconfined
). This is similar to subscribeOn
in
RxJava
.
You can find all operators here
Custom Operators
Creating your own custom operators is easily achieved. Here is a custom operator that transforms all strings to uppercase.
Example 3:
fun Flow<String>.uppercase(): Flow<String> {
return flow {
collect { emit(it.toUpperCase()) }
}
}
flowOf("No", "soup", "for", "you!")
.uppercase()
.collect { println(it) }
This prints:
NO
SOUP
FOR
YOU!
Data flow in Android
In Android we have to think about how technology works in the context of
Fragment
, Activity
and ViewModel
. There are multiple ways of doing this.
Personally I use Flow
in repositories and LiveData
in ViewModel
which I
will come back to later. Here's an example of how this might look.
Example 4:
class WeatherRepository {
fun getWeather(): Flow<Weather> {
return flow {
emit(Weather("Rainy :("))
emit(Weather("Cloudy :|"))
emit(Weather("Sunny :)"))
}
}
}
class WeatherViewModel(
private val weatherRepository: WeatherRepository
) : ViewModel() {
val weather = MutableLiveData<Weather>()
init { getWeather() }
private fun getWeather() {
viewModelScope.launch {
weatherRepository.getWeather()
.flowOn(Dispatchers.IO)
.collect { weather.value = it }
}
}
}
class WeatherFragment : Fragment() {
private fun initObservers() {
viewModel.weather.observe(this, Observer { println(it) })
}
}
This will print:
Rainy :(
Cloudy :|
Sunny :)
WeatherViewModel
exposes MutableLiveData<Weather>
to WeatherFragment
. In
the init block we start to collect the weather flow. In the fragment we start
observing the weather LiveData
. If the orientation changes and the fragment is
recreated the console will print description=Sunny :)
because LiveData
caches the last item.
If we try and rewrite the same code without LiveData
there are some
consequences.
class WeatherViewModel(
private val weatherRepository: WeatherRepository
) : ViewModel() {
val weatherFlow = weatherRepository.getWeather().flowOn(Dispatchers.IO)
}
class WeatherFragment : Fragment() {
private fun initFlows() {
lifecycleScope.launchWhenStarted {
viewModel.weatherFlow.collect { println(it) }
}
}
}
Instead of using LiveData
to communicate with WeatherFragment
we are using
Flow
. The code mostly works in the same way. If the orientation changes
however, WeatherFragment
will be recreated and attempt to collect the weather
flow. This will cause the flow to emit all its values again. For this reason I
like to use LiveData
.
Jetbrains are currently working on something called DataFlow
which will have
similar behaviour as LiveData
.
You can find more info about DataFlow
here
Testing
For testing flows you can use the take
operator to get a fixed amount of items
from the flow and toList
as the terminal operator to get the result as a list.
toList
suspends until it receives the number of items set in the take
operator or it waits until the flow ends. You can still use collect
instead of
toList
but personally I prefer using toList
because you end up with all
elements so you have more control of the result.
Example 5:
val ints = flowOf(0, 1, 2)
@Test
fun `confirm 3 ints from flow`() = runBlockingTest {
val result = ints.toList()
assertEquals(result.size, 3)
assertEquals(result[0], 0)
assertEquals(result[1], 1)
assertEquals(result[2], 2)
}
The flow should emit three values in total. In the test above we wait for the
flow to emit all values and then confirm the length and the values received. Be
careful when using only the toList
operator. If the flow emits an infinite
amount of values the function would suspend forever. To prevent this from
happening we have the take
operator to set the number of values that we want
to test.
@Test
fun `confirm 2 ints from flow`() = runBlockingTest {
val result = ints.take(2).toList()
assertEquals(result.size, 2)
assertEquals(result[0], 0)
assertEquals(result[1], 1)
}
Perfect!
Error handling
If you do not handle errors in your flow your app will crash like any other
suspend function. You can use the traditional way of wrapping the flow with a
try catch
or you can use the catch
operator to catch exceptions thrown
upstream. One thing to note is that it doesn't handle exceptions downstream. If
the collect
block throws an exception, your app will crash. To get around this
we can use collect
and onEach
together to collect all upstream exceptions.
Example 6:
scope.launch {
dataFlow
.onEach { /* Handle emitted value */ }
.catch { logError(it) /* Handle upstream exceptions */ }
.collect()
}
If an error is thrown the flow will still stop emitting new values. If it should continue after an exception you need to handle this case manually.
Conclusion
This was a short intro to Flow
. The API is lightweight and makes it hard to
leak subscriptions, which is a problem in RxJava
. We get this for free with
structured concurrency. No more silent resource leaks! Room
and SqlDelight
already have support for Flow
making it easy to get started. If you want to
read more about Flow
you can find more resources below.
References:
https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9 https://medium.com/@elizarov/exceptions-in-kotlin-flows-b59643c940fb https://medium.com/@elizarov/execution-context-of-kotlin-flows-b8c151c9309b https://medium.com/@elizarov/simple-design-of-kotlin-flow-4725e7398c4c https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/index.html