Kotlin flow collect all

Asynchronous Flow

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

Representing multiple values

Multiple values can be represented in Kotlin using collections. For example, we can have a simple function that returns a List of three numbers and then print them all using forEach:

fun simple(): ListInt> = listOf(1, 2, 3) fun main() < simple().forEach < value ->println(value) > >

You can get the full code from here.

Sequences

If we are computing the numbers with some CPU-consuming blocking code (each computation taking 100ms), then we can represent the numbers using a Sequence:

fun simple(): SequenceInt> = sequence < // sequence builder for (i in 1..3) < Thread.sleep(100) // pretend we are computing it yield(i) // yield next value > > fun main() < simple().forEach < value ->println(value) > >

You can get the full code from here.

This code outputs the same numbers, but it waits 100ms before printing each one.

Suspending functions

However, this computation blocks the main thread that is running the code. When these values are computed by asynchronous code we can mark the simple function with a suspend modifier, so that it can perform its work without blocking and return the result as a list:

import kotlinx.coroutines.* //sampleStart suspend fun simple(): ListInt> < delay(1000) // pretend we are doing something asynchronous here return listOf(1, 2, 3) > fun main() = runBlockingUnit> < simple().forEach < value ->println(value) > > //sampleEnd 

You can get the full code from here.

Читайте также:  Javascript parameter types in function

This code prints the numbers after waiting for a second.

Flows

Using the List result type, means we can only return all the values at once. To represent the stream of values that are being computed asynchronously, we can use a Flow type just like we would use a Sequence type for synchronously computed values:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun simple(): FlowInt> = flow < // flow builder for (i in 1..3) < delay(100) // pretend we are doing something useful here emit(i) // emit next value > > fun main() = runBlockingUnit> < // Launch a concurrent coroutine to check if the main thread is blocked launch < for (k in 1..3) < println("I'm not blocked $k") delay(100) > > // Collect the flow simple().collect < value ->println(value) > > //sampleEnd 

You can get the full code from here.

This code waits 100ms before printing each number without blocking the main thread. This is verified by printing «I’m not blocked» every 100ms from a separate coroutine that is running in the main thread:

I'm not blocked 1 1 I'm not blocked 2 2 I'm not blocked 3 3 

Notice the following differences in the code with the Flow from the earlier examples:

  • A builder function of Flow type is called flow.
  • Code inside a flow < . >builder block can suspend.
  • The simple function is no longer marked with a suspend modifier.
  • Values are emitted from the flow using an emit function.
  • Values are collected from the flow using a collect function.

We can replace delay with Thread.sleep in the body of simple ‘s flow < . >and see that the main thread is blocked in this case.

Flows are cold

Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected. This becomes clear in the following example:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun simple(): FlowInt> = flow < println("Flow started") for (i in 1..3) < delay(100) emit(i) > > fun main() = runBlockingUnit> < println("Calling simple function. ") val flow = simple() println("Calling collect. ") flow.collect < value ->println(value) > println("Calling collect again. ") flow.collect < value ->println(value) > > //sampleEnd 

You can get the full code from here.

Calling simple function. Calling collect. Flow started 1 2 3 Calling collect again. Flow started 1 2 3 

This is a key reason the simple function (which returns a flow) is not marked with suspend modifier. The simple() call itself returns quickly and does not wait for anything. The flow starts afresh every time it is collected and that is why we see «Flow started» every time we call collect again.

Flow cancellation basics

Flows adhere to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay). The following example shows how the flow gets cancelled on a timeout when running in a withTimeoutOrNull block and stops executing its code:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun simple(): FlowInt> = flow < for (i in 1..3) < delay(100) println("Emitting $i") emit(i) > > fun main() = runBlockingUnit> < withTimeoutOrNull(250) < // Timeout after 250ms simple().collect < value ->println(value) > > println("Done") > //sampleEnd 

You can get the full code from here.

Notice how only two numbers get emitted by the flow in the simple function, producing the following output:

Emitting 1 1 Emitting 2 2 Done

See Flow cancellation checks section for more details.

Flow builders

The flow < . >builder from the previous examples is the most basic one. There are other builders that allow flows to be declared:

  • The flowOf builder defines a flow that emits a fixed set of values.
  • Various collections and sequences can be converted to flows using the .asFlow() extension function.

For example, the snippet that prints the numbers 1 to 3 from a flow can be rewritten as follows:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlockingUnit> < //sampleStart // Convert an integer range to a flow (1..3).asFlow().collect < value ->println(value) > //sampleEnd >

You can get the full code from here.

Intermediate flow operators

Flows can be transformed using operators, in the same way as you would transform collections and sequences. Intermediate operators are applied to an upstream flow and return a downstream flow. These operators are cold, just like flows are. A call to such an operator is not a suspending function itself. It works quickly, returning the definition of a new transformed flow.

The basic operators have familiar names like map and filter. An important difference of these operators from sequences is that blocks of code inside these operators can call suspending functions.

For example, a flow of incoming requests can be mapped to its results with a map operator, even when performing a request is a long-running operation that is implemented by a suspending function:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart suspend fun performRequest(request: Int): String < delay(1000) // imitate long-running asynchronous work return "response $request" > fun main() = runBlockingUnit> < (1..3).asFlow() // a flow of requests .map < request ->performRequest(request) > .collect < response ->println(response) > > //sampleEnd 

You can get the full code from here.

It produces the following three lines, each appearing one second after the previous:

response 1 response 2 response 3

Transform operator

Among the flow transformation operators, the most general one is called transform. It can be used to imitate simple transformations like map and filter, as well as implement more complex transformations. Using the transform operator, we can emit arbitrary values an arbitrary number of times.

For example, using transform we can emit a string before performing a long-running asynchronous request and follow it with a response:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun performRequest(request: Int): String < delay(1000) // imitate long-running asynchronous work return "response $request" > fun main() = runBlockingUnit> < //sampleStart (1..3).asFlow() // a flow of requests .transform < request ->emit("Making request $request") emit(performRequest(request)) > .collect < response ->println(response) > //sampleEnd >

You can get the full code from here.

The output of this code is:

Making request 1 response 1 Making request 2 response 2 Making request 3 response 3

Size-limiting operators

Size-limiting intermediate operators like take cancel the execution of the flow when the corresponding limit is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management functions (like try < . >finally < . >blocks) operate normally in case of cancellation:

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart fun numbers(): FlowInt> = flow < try < emit(1) emit(2) println("This line will not execute") emit(3) > finally < println("Finally in numbers") > > fun main() = runBlockingUnit> < numbers() .take(2) // take only the first two .collect < value ->println(value) > > //sampleEnd 

You can get the full code from here.

The output of this code clearly shows that the execution of the flow < . >body in the numbers() function stopped after emitting the second number:

Terminal flow operators

Terminal operators on flows are suspending functions that start a collection of the flow. The collect operator is the most basic one, but there are other terminal operators, which can make it easier:

  • Conversion to various collections like toList and toSet.
  • Operators to get the first value and to ensure that a flow emits a single value.
  • Reducing a flow to a value with reduce and fold.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlockingUnit> < //sampleStart val sum = (1..5).asFlow() .map < it * it >// squares of numbers from 1 to 5 .reduce < a, b ->a + b > // sum them (terminal operator) println(sum) //sampleEnd >

You can get the full code from here.

© 2010–2022 JetBrains s.r.o. and Kotlin Programming Language contributors
Licensed under the Apache License, Version 2.0.
https://kotlinlang.org/docs/flow.html

Kotlin 1.7

Kotlin provides the ability to extend class or an interface with new functionality without having inherit from use design patterns such Decorator.

Kotlin is an open-source statically typed programming language that targets the JVM, Android, JavaScript Native.

Each individual collection of flow is performed sequentially unless special operators that operate multiple flows are used.

Источник

Оцените статью