An Introduction to Flow for Android | Martin Tøften
Flow is a new feature of Kotlin Coroutines. Whereas suspending functions in Kotlin can only return a single value; Flow can return multiple values.
You can think of
Flow
RxJava
Flow
Observable
Flowable
Flow
In this article we will look into the basics of
Flow
Example 1:
val streamOfInts = flow {
println("Start emitting")
for (i in 0..1) {
delay(100)
println("Sending $i")
emit(i)
}
println("Finished emitting")
}
streamOfInts.collect {
println("Receiving $it")
}
println("Flow completed")
This prints:
Start emitting
Sending 0
Receiving 0
Sending 1
Receiving 1
Finished emitting
Flow completed
The first thing we see in the code above is the flow builder. Inside the flow builder you can do any asynchronous work and use
emit
delay
collect
collect
collect
subscribe
RxJava
Operators
Flow
map
filter
reduce
Flow
map
Let's get some data from our database and then from the network:
Example 2:
val stream = flow {
println("Computed on $thread")
val dataFromDatabase = database.getData()
emit(dataFromDatabase)
val dataFromNetwork = network.getData()
emit(dataFromNetwork)
}
stream
.map { it.plus(" mapped on $thread") }
.flowOn(Dispatchers.IO)
.collect { println(it.plus(" collected on $thread")) }
This prints:
Computed on worker-1
Database data! mapped on worker-1 collected on main
Network data! mapped on worker-1 collected on main
The code above will first get data from the database and emit it to the collector. It then fetches updated data from the network and emits it. For simplicity we skip the caching of network data to the database.
The view can listen to this stream of data and update the UI when new data is collected. We have used the
map
flowOn
Main
Default
IO
Unconfined
subscribeOn
RxJava
You can find all operators here
Custom Operators
Creating your own custom operators is easily achieved. Here is a custom operator that transforms all strings to uppercase.
Example 3:
fun Flow<String>.uppercase(): Flow<String> {
return flow {
collect { emit(it.toUpperCase()) }
}
}
flowOf("No", "soup", "for", "you!")
.uppercase()
.collect { println(it) }
This prints:
NO SOUP FOR YOU!
Data flow in Android
In Android we have to think about how technology works in the context of
Fragment
Activity
ViewModel
Flow
LiveData
ViewModel
Example 4:
class WeatherRepository {
fun getWeather(): Flow<Weather> {
return flow {
emit(Weather("Rainy :("))
emit(Weather("Cloudy :|"))
emit(Weather("Sunny :)"))
}
}
}
class WeatherViewModel(
private val weatherRepository: WeatherRepository
) : ViewModel() {
val weather = MutableLiveData<Weather>()
init { getWeather() }
private fun getWeather() {
viewModelScope.launch {
weatherRepository.getWeather()
.flowOn(Dispatchers.IO)
.collect { weather.value = it }
}
}
}
class WeatherFragment : Fragment() {
private fun initObservers() {
viewModel.weather.observe(this, Observer { println(it) })
}
}
This will print:
Rainy :( Cloudy :| Sunny :)
WeatherViewModel
MutableLiveData<Weather>
WeatherFragment
LiveData
description=Sunny :)
LiveData
If we try and rewrite the same code without
LiveData
class WeatherViewModel(
private val weatherRepository: WeatherRepository
) : ViewModel() {
val weatherFlow = weatherRepository.getWeather().flowOn(Dispatchers.IO)
}
class WeatherFragment : Fragment() {
private fun initFlows() {
lifecycleScope.launchWhenStarted {
viewModel.weatherFlow.collect { println(it) }
}
}
}
Instead of using
LiveData
WeatherFragment
Flow
WeatherFragment
LiveData
Jetbrains are currently working on something called
DataFlow
LiveData
DataFlow
Testing
For testing flows you can use the
take
toList
toList
take
collect
toList
toList
Example 5:
val ints = flowOf(0, 1, 2)
@Test
fun `confirm 3 ints from flow`() = runBlockingTest {
val result = ints.toList()
assertEquals(result.size, 3)
assertEquals(result[0], 0)
assertEquals(result[1], 1)
assertEquals(result[2], 2)
}
The flow should emit three values in total. In the test above we wait for the flow to emit all values and then confirm the length and the values received. Be careful when using only the
toList
take
@Test
fun `confirm 2 ints from flow`() = runBlockingTest {
val result = ints.take(2).toList()
assertEquals(result.size, 2)
assertEquals(result[0], 0)
assertEquals(result[1], 1)
}
Perfect!
Error handling
If you do not handle errors in your flow your app will crash like any other suspend function. You can use the traditional way of wrapping the flow with a
try catch
catch
collect
collect
onEach
Example 6:
scope.launch {
dataFlow
.onEach { /* Handle emitted value */ }
.catch { logError(it) /* Handle upstream exceptions */ }
.collect()
}
If an error is thrown the flow will still stop emitting new values. If it should continue after an exception you need to handle this case manually.
Conclusion
This was a short intro to
Flow
RxJava
Room
SqlDelight
Flow
Flow
References:
https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://kotlinlang.org/docs/reference/coroutines/flow.html#asynchronous-flow https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4 https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9 https://medium.com/@elizarov/exceptions-in-kotlin-flows-b59643c940fb https://medium.com/@elizarov/execution-context-of-kotlin-flows-b8c151c9309b https://medium.com/@elizarov/simple-design-of-kotlin-flow-4725e7398c4c https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/index.html