如何在KotlinFlows中累积值并发出它?

o2g1uqev  于 2023-04-21  发布在  Kotlin
关注(0)|答案(2)|浏览(125)

我有一个整数流,我想把它们累积到一个列表中。当列表有一个特定的大小时,这个列表应该被进一步发出。这种方法适用于runningFold,但问题是当列表已经满了时,它不会被重新创建。(见下面的代码)
所以基本上我想把输入[1,2,3,4,5,6,7,8,9,10]转换成[1,2,3,4,5]和[6,7,8,9,10]
我知道,这是可能的,只要清除列表的大小为5,但这不是很好的海事组织。
实现这一目标的最佳途径是什么?

fun main() = runBlocking {
        launch {
            repeat(20) {
                emitValue(it)
            }
        }

        flow
            .collect {
                // it should be of type Result
                println(it)
            }
    }

    private val sharedFlow = MutableSharedFlow<Int>()

    val flow = sharedFlow.asSharedFlow()
        .onEach { println("received: $it") }
        .runningFold(mutableListOf<Int>()) { list, value ->
            list.add(value)
            list
        }
        .filter { it.size >= 5 }
        .map {
            val result = Result(it.toList())
            it.clear() // that isn't very nice
            result
        }

    suspend fun emitValue(value: Int) {
        sharedFlow.emit(value)
    }

    data class Result(val list: List<Int>)
yzckvree

yzckvree1#

你可以将流转换成一个列表并将其分块,但不确定性能如何。

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val flow = (1..8).asFlow() // Assuming you have a flow emitting integers
    val chunkSize = 3 // Define the size of each chunk
    val result = flow.toList().chunked(chunkSize)
    println(result) // Prints [[1, 2, 3], [4, 5, 6], [7, 8]]
}
nhhxz33t

nhhxz33t2#

您可以使用flow构建器来更好地完成此操作,它可能比简单的列表更适合您遇到的任何复杂问题。例如:

val blockSize = 5
val flow = flow<List<Int>> {
    var accumulator = ArrayList<Int>(blockSize)
    sharedFlow.collect {
        accumulator.add(it)
        if (accumulator.size == 5) {
            emit(accumulator)
            accumulator = ArrayList<Int>(blockSize)
        }
    }
}

请注意,代码中的asSharedFlow()不会完成任何操作,因为一旦您在SharedFlow上使用任何Flow操作符,生成的Flow就不是SharedFlow。如果您希望最终结果是SharedFlow,则必须在最终流结果上使用shareIn。您目前拥有的是一个冷流,每次收集它时,都会从SharedFlow开始收集。

相关问题