An Introduction to Flow for Android

Flow is a new feature of Kotlin Coroutines. Whereas suspending functions in Kotlin can only return a single value; Flow can return multiple values.

You can think of Flow as a stream. Values flow downstream like water in a river. Values can be created from asynchronous actions such as network requests or database calls. If you are familiar with RxJava - Flow behaves similarly to Observable and Flowable. Flow is built on top of coroutines. We love coroutines because they bring structured concurrency to Kotlin.

In this article we will look into the basics of Flow and in which scenarios we can use it.

Example 1:

val streamOfInts = flow {
  println("Start emitting")
  for (i in 0..1) {
    delay(100)
    println("Sending $i")
    emit(i)
  }
  println("Finished emitting")
}

streamOfInts.collect {
  println("Receiving $it")
}

println("Flow completed")

This prints:

Start emitting
Sending 0
Receiving 0
Sending 1
Receiving 1
Finished emitting
Flow completed

The first thing we see in the code above is the flow builder. Inside the flow builder you can do any asynchronous work and use emit to send some kind of value. A 100 millisecond delay is used to simulate a network request before emitting a value. delay is a handy suspend function which delays for a specified time before resuming where it left off. Two values are emitted before the flow completes. collect is also a suspend function and it is used to start collecting values from the flow. Flows are cold streams so the flow will not start emitting before collect is called. collect is similar to subscribe in RxJava. The flow builder is the producer and the collector is the consumer.

Operators

Flow can be used in a reactive programming style. It has a lot of different operators that can transform a flow with functional operators. Operators such as map, filter, reduce and many more. Flow supports suspending functions on most operators which means that you can execute sequential asynchronous tasks in operators like map.

Let’s get some data from our database and then from the network:

Example 2:

val stream = flow {
  println("Computed on $thread")
  val dataFromDatabase = database.getData()
  emit(dataFromDatabase)
  val dataFromNetwork = network.getData()
  emit(dataFromNetwork)
}

stream
  .map { it.plus(" mapped on $thread") }
  .flowOn(Dispatchers.IO)
  .collect { println(it.plus(" collected on $thread")) }

This prints:

Computed on worker-1
Database data! mapped on worker-1 collected on main
Network data! mapped on worker-1 collected on main

The code above will first get data from the database and emit it to the collector. It then fetches updated data from the network and emits it. For simplicity we skip the caching of network data to the database.

The view can listen to this stream of data and update the UI when new data is collected. We have used the map operator to transform which thread the code is running in. You can see that we introduced a new operator named flowOn which will switch the upstream context in the flow. If you want to execute an operation off the main thread you can set which kind of dispatcher you want to use (Main, Default, IO, Unconfined). This is similar to subscribeOn in RxJava.

You can find all operators here

Custom Operators

Creating your own custom operators is easily achieved. Here is a custom operator that transforms all strings to uppercase.

Example 3:

fun Flow<String>.uppercase(): Flow<String> {
  return flow {
    collect { emit(it.toUpperCase()) }
  }
}

flowOf("No", "soup", "for", "you!")
  .uppercase()
  .collect { println(it) }

This prints:

NO
SOUP
FOR
YOU!

Data flow in Android

In Android we have to think about how technology works in the context of Fragment, Activity and ViewModel. There are multiple ways of doing this. Personally I use Flow in repositories and LiveData in ViewModel which I will come back to later. Here’s an example of how this might look.

Example 4:

class WeatherRepository {
  fun getWeather(): Flow<Weather> {
    return flow {
      emit(Weather("Rainy :("))
      emit(Weather("Cloudy :|"))
      emit(Weather("Sunny :)"))
    }
  }
}

class WeatherViewModel(
  private val weatherRepository: WeatherRepository
) : ViewModel() {
  val weather = MutableLiveData<Weather>()

  init { getWeather() }

  private fun getWeather() {
    viewModelScope.launch {
      weatherRepository.getWeather()
        .flowOn(Dispatchers.IO)
        .collect { weather.value = it }
      }
    }
  }

class WeatherFragment : Fragment() {
  private fun initObservers() {
    viewModel.weather.observe(this, Observer { println(it) })
  }
}

This will print:

Rainy :(
Cloudy :|
Sunny :)

WeatherViewModel exposes MutableLiveData<Weather> to WeatherFragment. In the init block we start to collect the weather flow. In the fragment we start observing the weather LiveData. If the orientation changes and the fragment is recreated the console will print description=Sunny :) because LiveData caches the last item.

If we try and rewrite the same code without LiveData there are some consequences.

class WeatherViewModel(
  private val weatherRepository: WeatherRepository
) : ViewModel() {
  val weatherFlow = weatherRepository.getWeather().flowOn(Dispatchers.IO) 
}

class WeatherFragment : Fragment() {
  private fun initFlows() {
    lifecycleScope.launchWhenStarted {
      viewModel.weatherFlow.collect { println(it) }
    }
  }
}

Instead of using LiveData to communicate with WeatherFragment we are using Flow. The code mostly works in the same way. If the orientation changes however, WeatherFragment will be recreated and attempt to collect the weather flow. This will cause the flow to emit all its values again. For this reason I like to use LiveData.

Jetbrains are currently working on something called DataFlow which will have similar behaviour as LiveData. You can find more info about DataFlow here

Testing

For testing flows you can use the take operator to get a fixed amount of items from the flow and toList as the terminal operator to get the result as a list. toList suspends until it receives the number of items set in the take operator or it waits until the flow ends. You can still use collect instead of toList but personally I prefer using toList because you end up with all elements so you have more control of the result.

Example 5:

val ints = flowOf(0, 1, 2)

@Test
fun `confirm 3 ints from flow`() = runBlockingTest {
  val result = ints.toList()
  assertEquals(result.size, 3)
  assertEquals(result[0], 0)
  assertEquals(result[1], 1)
  assertEquals(result[2], 2)
}

The flow should emit three values in total. In the test above we wait for the flow to emit all values and then confirm the length and the values received. Be careful when using only the toList operator. If the flow emits an infinite amount of values the function would suspend forever. To prevent this from happening we have the take operator to set the number of values that we want to test.

@Test
fun `confirm 2 ints from flow`() = runBlockingTest {
  val result = ints.take(2).toList()
  assertEquals(result.size, 2)
  assertEquals(result[0], 0)
  assertEquals(result[1], 1)
}

Perfect!

Error handling

If you do not handle errors in your flow your app will crash like any other suspend function. You can use the traditional way of wrapping the flow with a try catch or you can use the catch operator to catch exceptions thrown upstream. One thing to note is that it doesn’t handle exceptions downstream. If the collect block throws an exception, your app will crash. To get around this we can use collect and onEach together to collect all upstream exceptions.

Example 6:

scope.launch {
  dataFlow
    .onEach { /* Handle emitted value */ }
    .catch { logError(it) /* Handle upstream exceptions */ } 
    .collect()
}

If an error is thrown the flow will still stop emitting new values. If it should continue after an exception you need to handle this case manually.

Conclusion

This was a short intro to Flow. The API is lightweight and makes it hard to leak subscriptions, which is a problem in RxJava. We get this for free with structured concurrency. No more silent resource leaks! Room and SqlDelight already have support for Flow making it easy to get started. If you want to read more about Flow you can find more resources below.

References:

https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9 https://medium.com/@elizarov/exceptions-in-kotlin-flows-b59643c940fb https://medium.com/@elizarov/execution-context-of-kotlin-flows-b8c151c9309b https://medium.com/@elizarov/simple-design-of-kotlin-flow-4725e7398c4c https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/index.html