Introduction to Kotlin Flows

Introduction to Kotlin Flows

Kotlin Flow API provides a powerful tool for handling asynchronous data streams. In this article, we’ll provide an introduction to Kotlin Flows, including their basic concepts and usage. This article is part of a series that covers various topics related to Kotlin Flows, including flow builders, operators, exception handling, context preservation, buffering, conflation, processing, composition, coroutines, shared flows, state flows, cold flows, hot flows, cancellation, testing, and more.

What is a Flow?

A flow is a sequence of values that are emitted over time. You can think of it like a river that flows and carries values along with it. Just like you can use a bucket to collect water from a river, you can use operators to collect values from a flow.

Flows are particularly useful when dealing with asynchronous data, such as data that comes from an API call or a database query. They allow you to handle the data as it becomes available, without blocking the main thread of your app.

Here’s an example that demonstrates the basic usage of a flow:

val myFlow = flowOf(1, 2, 3)

myFlow.collect { value ->
    println(value)
}

This code creates a flow that emits the values 1, 2, and 3. The collect operator is then used to collect these values. The code inside the lambda function is executed for each value emitted by the flow. In this case, we simply print the value.

The output of this code will be:

1
2
3

Why Use Flows?

First let's explore some of the reasons why you might want to use flows instead of other approaches, and highlight the ability of flows to return multiple values over time.

Multiple Values Over Time

One of the main benefits of using flows is their ability to return multiple values over time. Unlike other approaches, such as using a single value or a collection of values, flows allow you to handle a stream of values that are emitted over time. You can return one value then another, then another.

Here's an example that demonstrates this ability:

val myFlow = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

myFlow.collect { value ->
    println(value)
}

This code creates a flow that emits three values with a delay of 1 second between each value. The collect operator is then used to collect these values.

The output of this code will be:

1
(delay of 1 second)
2
(delay of 1 second)
3

As you can see, the flow emits multiple values over time, with a delay between each value. This allows you to handle each value as it becomes available, without having to wait for all the values to be emitted.

Asynchronous Flow

One of the main benefits of using flows is their ability to handle asynchronous data. A flow can emit values asynchronously, without blocking the main thread of your app.

Here’s an example that demonstrates the asynchronous nature of flows:

val myFlow = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

myFlow.collect { value ->
    println(value)
}

This code creates a flow using the flow builder function. Inside the flow, we emit three values with a delay of 1 second between each value. The collect operator is then used to collect these values.

The output of this code will be:

1
(delay of 1 second)
2
(delay of 1 second)
3

As you can see, the values are emitted asynchronously, with a delay between each value. The collect operator collects these values as they become available.

Flow Builders

Kotlin provides several builder functions for creating flows. These include flowOf, asFlow, channelFlow, and callbackFlow. Each builder function provides a different way to create a flow.

For example, the flowOf builder function is used to create a flow from a fixed set of values:

val myFlow = flowOf(1, 2, 3)

The asFlow builder function is used to create a flow from an iterable or a sequence:

val myList = listOf(1, 2, 3)
val myFlow = myList.asFlow()

The channelFlow and callbackFlow builder functions are used to create flows that emit values based on callbacks or channel operations.

For more detailed information on flow builders and their usage, check out our article on Flow Builders.

Flow Operators

Kotlin Flow API provides a rich set of operators for manipulating flows. These include intermediate operators like map, filter, take, and drop, as well as terminal operators like collect, toList, toSet, reduce, and fold. You must be already aware of some of these from Lists.

Intermediate Operators

Intermediate operators are used to transform the values emitted by a flow. They apply a given function to each value and emit the result of the function.

Terminal Operators

Terminal operators are used to produce a result or a side effect from a flow. They collect or reduce the values emitted by the flow into a single result or perform an action for each value.

For more detailed information on flow operators and their usage, check out our article on Flow Operators.

Exception Handling in Flows

Flows provide built-in support for exception handling. You can use the catch operator to catch exceptions that occur while collecting values from a flow.

Here’s an example that demonstrates exception handling in flows:

val myFlow = flow {
    emit(1)
    throw RuntimeException("Error")
}

myFlow.catch { e ->
    println(e.message)
}.collect { value ->
    println(value)
}

This code creates a flow that emits a value and then throws an exception. The catch operator is then used to catch the exception. The code inside the lambda function is executed if an exception occurs while collecting values from the flow. In this case, we simply print the exception message.

The output of this code will be:

1
Error

As you can see, the first value is emitted and collected before the exception occurs. When the exception is thrown, it’s caught by the catch operator and handled.

For more detailed information on exception handling in flows, check out our article on Exception Handling in Flows.

Flow Context Preservation

Flows provide built-in support for context preservation. This means that flows automatically preserve the context in which they were created and collected.

Here’s an example that demonstrates context preservation in flows:

val myFlow = flow {
    println("Flow context: ${coroutineContext[CoroutineName]}")
    emit(1)
}

withContext(CoroutineName("MyCoroutine")) {
    myFlow.collect { value ->
        println("Collect context: ${coroutineContext[CoroutineName]}")
    }
}

This code creates a flow using the flow builder function. Inside the flow, we print the current coroutine context and emit a value. Then we use the withContext function to change the coroutine context and collect the values from the flow.

The output of this code will be:

Flow context: null
Collect context: MyCoroutine

As you can see, the flow preserves the context in which it was created (null in this case) and collected (MyCoroutine in this case).

For more detailed information on flow context preservation, check out our article on Flow Context Preservation.

Buffering, Conflation, and Processing

Flows provide several mechanisms for controlling how values are emitted and collected. These include buffering, conflation, and processing.

Buffering allows you to control how many values are emitted by a flow before they are collected. You can use the buffer operator to specify the size of the buffer.

Conflation allows you to control how values are combined when they are emitted faster than they can be collected. You can use the conflate operator to specify how values should be conflated.

Processing allows you to control how values are processed when they are collected. You can use operators like collectLatest to specify how values should be processed.

For more detailed information on buffering, conflation, and processing in flows, check out our article on Buffering, Conflation, and Processing.

Composing Multiple Flows

Flows provide powerful tools for composing multiple flows together. You can use operators like zip, combine, and flatMapConcat to combine or merge multiple flows into a single flow.

Here’s an example that demonstrates how to compose multiple flows using the zip operator:

val myFlow1 = flowOf(1, 2, 3)
val myFlow2 = flowOf(4, 5, 6)

myFlow1.zip(myFlow2) { a, b ->
    a + b
}.collect { value ->
    println(value)
}

This code creates two flows that emit different values. The zip operator is then used to combine these flows into a single flow that emits the sum of the corresponding values from each flow. The code inside the lambda function is executed for each pair of values emitted by the two flows. In this case, we simply add the values together. Finally, we use the collect operator to collect the values emitted by the zipped flow.

The output of this code will be:

5
7
9

As you can see, each value emitted by the zipped flow is the sum of the corresponding values from each of the original flows.

For more detailed information on composing multiple flows, check out our article on Composing Multiple Flows.

Flows and Coroutines

Flows are built on top of coroutines in Kotlin. This means that you can use coroutine concepts like suspend functions and coroutine contexts when working with flows.

Here’s an example that demonstrates how to use suspend functions with flows:

suspend fun getData(): Int {
    delay(1000)
    return 1
}

val myFlow = flow {
    val data = getData()
    emit(data)
}

myFlow.collect { value ->
    println(value)
}

This code defines a suspend function that simulates getting some data asynchronously. The function introduces a delay of 1 second and returns a value. Then we create a flow using the flow builder function. Inside the flow, we call our suspend function to get some data and emit it as a value. Finally, we use the collect operator to collect this value.

The output of this code will be:

1

As you can see, we were able to use a suspend function to get some data asynchronously and emit it as a value in our flow.

For more detailed information on how flows and coroutines work together, check out our article on Flows and Coroutines.

SharedFlow and StateFlow

Kotlin Flow API provides two special types of flows: SharedFlow and StateFlow. These flows are designed to share data between multiple collectors.

SharedFlow

A SharedFlow is a flow that can be collected by multiple collectors simultaneously. Each collector receives all the values emitted by the flow, even if they start collecting after the flow has started emitting values.

StateFlow

A StateFlow is a flow that represents a stateful value. It always has a current value, and collectors can access this value at any time. When the value changes, the flow emits the new value to all its collectors.

For more detailed information on SharedFlow and StateFlow, check out our articles on SharedFlow and StateFlow.

ColdFlows and HotFlows

Cold Flows

A cold flow is a flow that starts emitting values when it’s collected. Each time the flow is collected, it starts emitting values from the beginning.

Here’s an example that demonstrates a cold flow:

val myFlow = flow {
    println("Starting flow")
    emit(1)
    emit(2)
    emit(3)
}

myFlow.collect { value ->
    println(value)
}

myFlow.collect { value ->
    println(value)
}

This code creates a cold flow using the flow builder function. Inside the flow, we print a message to indicate that the flow is starting, and then we emit three values. Then we collect the values from the flow twice.

The output of this code will be:

Starting flow
1
2
3
Starting flow
1
2
3

As you can see, each time we collect the values from the flow, it starts emitting values from the beginning.

Hot Flows

A hot flow is a flow that emits values independently of whether it’s being collected or not. Once a hot flow starts emitting values, it continues to do so until it’s completed or cancelled.

Here’s an example that demonstrates a hot flow:

val myChannel = BroadcastChannel<Int>(Channel.BUFFERED)
val myFlow = myChannel.asFlow()

GlobalScope.launch {
    myChannel.send(1)
    delay(1000)
    myChannel.send(2)
    delay(1000)
    myChannel.send(3)
}

delay(1500)

myFlow.collect { value ->
    println(value)
}

This code creates a hot flow using a BroadcastChannel and its asFlow extension function. Then we use a coroutine to send three values to the channel with a delay between each value. Finally, we introduce a delay of 1.5 seconds before collecting the values from the flow.

The output of this code will be:

2
3

As you can see, even though we started collecting the values from the flow after it had already started emitting values, we still received some of the emitted values.

For more detailed information on ColdFlows and HotFlows, check out our articles on ColdFlows and HotFlows.

Flow Cancellation

Flows provide built-in support for cancellation. You can use coroutine cancellation mechanisms like cancel or withTimeout to cancel a flow.

Here’s an example that demonstrates how to cancel a flow:

val myFlow = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

withTimeoutOrNull(1500) {
    myFlow.collect { value ->
        println(value)
    }
}

This code creates a flow that emits three values with a delay of 1 second between each value. Then we use the withTimeoutOrNull function to collect these values with a timeout of 1.5 seconds.

The output of this code will be:

1
2

As you can see, only two of the three values are collected before the timeout occurs and the collection is cancelled.

For more detailed information on flow cancellation, check out our article on Flow Cancellation.

Flow Testing

Flows provide built-in support for testing. You can use test coroutine dispatchers and test observers to test flows in your app.

Here’s an example that demonstrates how to test a flow:

@Test
fun testMyFlow() = runBlockingTest {
    val myFlow = flowOf(1, 2, 3)

    val result = mutableListOf<Int>()
    myFlow.collect { value ->
        result.add(value)
    }

    assertEquals(listOf(1, 2, 3), result)
}

This code defines a test function that tests a simple flow. Inside the test function, we create a flow that emits three values. Then we use the collect operator to collect these values and add them to a list. Finally, we use the assertEquals function to check that the collected values match the expected values.

For more detailed information on flow testing, check out our article on Flow Testing.

Flow onCompletion Operator

The onCompletion operator is used to perform an action when a flow completes, either successfully or with an exception.

Here’s an example that demonstrates the use of the onCompletion operator:

val myFlow = flowOf(1, 2, 3)

myFlow.onCompletion { cause ->
    if (cause != null) {
        println("Flow completed with exception: $cause")
    } else {
        println("Flow completed successfully")
    }
}.collect { value ->
    println(value)
}

This code creates a flow that emits three values. The onCompletion operator is then used to perform an action when the flow completes. The code inside the lambda function is executed when the flow completes. In this case, we check if the flow completed with an exception or not, and print a message accordingly. Finally, we use the collect operator to collect the values emitted by the flow.

The output of this code will be:

1
2
3
Flow completed successfully

As you can see, the onCompletion operator is called after all the values have been collected from the flow.

For more detailed information on the onCompletion operator, check out our article on Flow onCompletion Operator.

Flow Retry Operator

The retry operator is used to retry a flow when an exception occurs. You can specify how many times to retry and under what conditions.

Here’s an example that demonstrates the use of the retry operator:

var attempt = 0

val myFlow = flow {
    attempt++
    if (attempt < 3) {
        throw RuntimeException("Error")
    }
    emit(1)
}

myFlow.retry(2).collect { value ->
    println(value)
}

This code creates a flow that throws an exception on its first two attempts and emits a value on its third attempt. The retry operator is then used to retry the flow twice. Finally, we use the collect operator to collect the value emitted by the flow.

The output of this code will be:

1

As you can see, even though the flow threw an exception on its first two attempts, it was retried twice and eventually emitted a value.

For more detailed information on the retry operator, check out our article on Flow Retry Operator.

Flow Emissions

Flows provide several mechanisms for controlling how values are emitted. These include operators like onStart, onEach, and onEmpty.

The onStart operator is used to perform an action before any values are emitted by a flow. You can use this operator to initialize some state or perform some setup before collecting values from a flow.

The onEach operator is used to perform an action for each value emitted by a flow. You can use this operator to update some state or perform some side effect for each value.

The onEmpty operator is used to perform an action if no values are emitted by a flow. You can use this operator to handle the case where a flow completes without emitting any values.

For more detailed information on these operators and their usage, check out our articles on Flow Emissions, Flow Transformation, Flow Collection, Exception Handling in Flows, Flow Completion, and Flow Context.

Conclusion

In this article, we’ve provided an introduction to Kotlin Flows, including their basic concepts and usage. We’ve covered topics like asynchronous flows, flow builders, flow operators, exception handling, context preservation, buffering, conflation, processing, composition, coroutines, shared flows, state flows, cold flows, hot flows, cancellation, testing, and more. This article is part of a series that provides in-depth information on various topics related to Kotlin Flows in Coroutines.