kotlin 合并两个MutableStateFlows合并在一起并发出值?

jm81lzqq  于 2023-03-24  发布在  Kotlin
关注(0)|答案(1)|浏览(368)

所以,我在这里读了很多问题,以及博客文章和关于Kotlinflow combine函数的文档。The official docs表明,在其中一个流的每次发射时,其他流的 previous 值也会被发射。我也用弹珠来验证我对旧学校ReactiveX方法的期望。
然而,当我合并两个流时,我看到了不同的行为。当每个流发出时,来自其他流的值为空/初始值。

val itemFlow: MutableStateFlow<List<String>> = MutableStateFlow(listOf())
val cacheFlow: MutableStateFlow<Map<String, String>> = MutableStateFlow(mapOf())

combine(itemFlow, cacheFlow) { items, cache ->
    println("items: $items, cache: $cache")

    // transform the items + cache into a List<String> and return it to be collected...
}.collect {
    // operation on cache-updated list
}

每当我调用itemFlow.update { ... }时,我可以在日志语句中看到,在combine的转换块中,它将打印如下内容

items: (item1 item2 item3 item4), cache: []

相反,当我调用cacheFlow.update { ... }时,情况正好相反:

items: [], cache: (a : item1, b: item2)

我知道我可能只是访问流的value或块内的其他东西,但我宁愿让它按照文档工作。我假设我错过了一些关于操作符如何工作的重要内容,或者可能是使用StateFlow而不是其他实现的问题?我愿意改变它,只是在获得预期结果方面寻求任何帮助。它看起来像:

items: (item1, item2, item3, item4), cache: (a: item1, b: item2)
deyfvvtc

deyfvvtc1#

一个StateFlow总是有一个当前的value,它是第一个立即发送到新收集器的东西。这就是为什么你必须MutableStateFlow()“构造函数”或stateIn函数提供一个初始值。
因此,combine函数根据其文档进行操作,该文档声明它组合了“每个流最近发出的值”。操作符在后台开始收集两个流。因此,每个StateFlows发出的第一个项是其空集合的当前/初始值。
当这个组合流第一次被收集时,假设它在你开始更新源流之前就开始被收集,它必然会首先看到一个转换,其中传入的值是两个空集合。然后,当你更新两个源StateFlows中的一个时,它会看到来自该源流的新数据与来自另一个源流的最新value组合在一起,后者仍然是一个空集合。
如果你的目标是组合流中的第一个东西是来自两个源中的每个源的非初始空集合,你可能想使用SharedFlows而不是StateFlows,这样它们就不必有一些初始值。你仍然可以给予它们replay = 1,这样新的收集器仍然会接收到最新的数据,如果它已经存在的话。
缺点是SharedFlow没有update函数,因为SharedFlow没有 * 当前值 * 的概念。它可能在其缓冲区中有一些值,也可能没有。如果您的新值不必依赖于前一个值的值,那么您无论如何都不需要update,而是可以在其上使用emit
如果SharedFlow不适合,我认为你必须对你的组合流进行某种过滤,如果你有时可能有一个空集合的有效数据,这可能是非常复杂的。
像这样的东西?

private val initialEmptyList: List<String> = ArrayList() // don't use listOf() or emptyList() because these return a global instance
private val initialEmptyMap: Map<String, String> = HashMap() // same

val itemFlow: MutableStateFlow<List<String>> = MutableStateFlow(initialEmptyList)
val cacheFlow: MutableStateFlow<Map<String, String>> = MutableStateFlow(initialEmptyMap)

combine(
    itemFlow.filter { it !== initialEmptyList }, // note, reference equality check
    cacheFlow.filter { it !== initialEmptyMap }  // same
) { items, cache ->
    //...

相关问题