Bakken & BaeckTech

An Introduction to Flow for Android | Martin Tøften

Flow is a new feature of Kotlin Coroutines. Whereas suspending functions in Kotlin can only return a single value; Flow can return multiple values.

You can think of

Flow
as a stream. Values flow downstream like water in a river. Values can be created from asynchronous actions such as network requests or database calls. If you are familiar with
RxJava
-
Flow
behaves similarly to
Observable
and
Flowable
.
Flow
is built on top of coroutines. We love coroutines because they bring structured concurrency to Kotlin.

In this article we will look into the basics of

Flow
and in which scenarios we can use it.

Example 1:

val streamOfInts = flow { println("Start emitting") for (i in 0..1) { delay(100) println("Sending $i") emit(i) } println("Finished emitting") } streamOfInts.collect { println("Receiving $it") } println("Flow completed")

This prints:

Start emitting Sending 0 Receiving 0 Sending 1 Receiving 1 Finished emitting Flow completed

The first thing we see in the code above is the flow builder. Inside the flow builder you can do any asynchronous work and use

emit
to send some kind of value. A 100 millisecond delay is used to simulate a network request before emitting a value.
delay
is a handy suspend function which delays for a specified time before resuming where it left off. Two values are emitted before the flow completes.
collect
is also a suspend function and it is used to start collecting values from the flow. Flows are cold streams so the flow will not start emitting before
collect
is called.
collect
is similar to
subscribe
in
RxJava
. The flow builder is the producer and the collector is the consumer.

Operators

Flow
can be used in a reactive programming style. It has a lot of different operators that can transform a flow with functional operators. Operators such as
map
,
filter
,
reduce
and many more.
Flow
supports suspending functions on most operators which means that you can execute sequential asynchronous tasks in operators like
map
.

Let's get some data from our database and then from the network:

Example 2:

val stream = flow { println("Computed on $thread") val dataFromDatabase = database.getData() emit(dataFromDatabase) val dataFromNetwork = network.getData() emit(dataFromNetwork) } stream .map { it.plus(" mapped on $thread") } .flowOn(Dispatchers.IO) .collect { println(it.plus(" collected on $thread")) }

This prints:

Computed on worker-1 Database data! mapped on worker-1 collected on main Network data! mapped on worker-1 collected on main

The code above will first get data from the database and emit it to the collector. It then fetches updated data from the network and emits it. For simplicity we skip the caching of network data to the database.

The view can listen to this stream of data and update the UI when new data is collected. We have used the

map
operator to transform which thread the code is running in. You can see that we introduced a new operator named
flowOn
which will switch the upstream context in the flow. If you want to execute an operation off the main thread you can set which kind of dispatcher you want to use (
Main
,
Default
,
IO
,
Unconfined
). This is similar to
subscribeOn
in
RxJava
.

You can find all operators here

Custom Operators

Creating your own custom operators is easily achieved. Here is a custom operator that transforms all strings to uppercase.

Example 3:

fun Flow<String>.uppercase(): Flow<String> { return flow { collect { emit(it.toUpperCase()) } } } flowOf("No", "soup", "for", "you!") .uppercase() .collect { println(it) }

This prints:

NO SOUP FOR YOU!

Data flow in Android

In Android we have to think about how technology works in the context of

Fragment
,
Activity
and
ViewModel
. There are multiple ways of doing this. Personally I use
Flow
in repositories and
LiveData
in
ViewModel
which I will come back to later. Here's an example of how this might look.

Example 4:

class WeatherRepository { fun getWeather(): Flow<Weather> { return flow { emit(Weather("Rainy :(")) emit(Weather("Cloudy :|")) emit(Weather("Sunny :)")) } } } class WeatherViewModel( private val weatherRepository: WeatherRepository ) : ViewModel() { val weather = MutableLiveData<Weather>() init { getWeather() } private fun getWeather() { viewModelScope.launch { weatherRepository.getWeather() .flowOn(Dispatchers.IO) .collect { weather.value = it } } } } class WeatherFragment : Fragment() { private fun initObservers() { viewModel.weather.observe(this, Observer { println(it) }) } }

This will print:

Rainy :( Cloudy :| Sunny :)

WeatherViewModel
exposes
MutableLiveData<Weather>
to
WeatherFragment
. In the init block we start to collect the weather flow. In the fragment we start observing the weather
LiveData
. If the orientation changes and the fragment is recreated the console will print
description=Sunny :)
because
LiveData
caches the last item.

If we try and rewrite the same code without

LiveData
there are some consequences.

class WeatherViewModel( private val weatherRepository: WeatherRepository ) : ViewModel() { val weatherFlow = weatherRepository.getWeather().flowOn(Dispatchers.IO) } class WeatherFragment : Fragment() { private fun initFlows() { lifecycleScope.launchWhenStarted { viewModel.weatherFlow.collect { println(it) } } } }

Instead of using

LiveData
to communicate with
WeatherFragment
we are using
Flow
. The code mostly works in the same way. If the orientation changes however,
WeatherFragment
will be recreated and attempt to collect the weather flow. This will cause the flow to emit all its values again. For this reason I like to use
LiveData
.

Jetbrains are currently working on something called

DataFlow
which will have similar behaviour as
LiveData
. You can find more info about
DataFlow
here

Testing

For testing flows you can use the

take
operator to get a fixed amount of items from the flow and
toList
as the terminal operator to get the result as a list.
toList
suspends until it receives the number of items set in the
take
operator or it waits until the flow ends. You can still use
collect
instead of
toList
but personally I prefer using
toList
because you end up with all elements so you have more control of the result.

Example 5:

val ints = flowOf(0, 1, 2) @Test fun `confirm 3 ints from flow`() = runBlockingTest { val result = ints.toList() assertEquals(result.size, 3) assertEquals(result[0], 0) assertEquals(result[1], 1) assertEquals(result[2], 2) }

The flow should emit three values in total. In the test above we wait for the flow to emit all values and then confirm the length and the values received. Be careful when using only the

toList
operator. If the flow emits an infinite amount of values the function would suspend forever. To prevent this from happening we have the
take
operator to set the number of values that we want to test.

@Test fun `confirm 2 ints from flow`() = runBlockingTest { val result = ints.take(2).toList() assertEquals(result.size, 2) assertEquals(result[0], 0) assertEquals(result[1], 1) }

Perfect!

Error handling

If you do not handle errors in your flow your app will crash like any other suspend function. You can use the traditional way of wrapping the flow with a

try catch
or you can use the
catch
operator to catch exceptions thrown upstream. One thing to note is that it doesn't handle exceptions downstream. If the
collect
block throws an exception, your app will crash. To get around this we can use
collect
and
onEach
together to collect all upstream exceptions.

Example 6:

scope.launch { dataFlow .onEach { /* Handle emitted value */ } .catch { logError(it) /* Handle upstream exceptions */ } .collect() }

If an error is thrown the flow will still stop emitting new values. If it should continue after an exception you need to handle this case manually.

Conclusion

This was a short intro to

Flow
. The API is lightweight and makes it hard to leak subscriptions, which is a problem in
RxJava
. We get this for free with structured concurrency. No more silent resource leaks!
Room
and
SqlDelight
already have support for
Flow
making it easy to get started. If you want to read more about
Flow
you can find more resources below.

References:

https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9 https://medium.com/@elizarov/exceptions-in-kotlin-flows-b59643c940fb https://medium.com/@elizarov/execution-context-of-kotlin-flows-b8c151c9309b https://medium.com/@elizarov/simple-design-of-kotlin-flow-4725e7398c4c https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/index.html