我正在尝试学习协程,我仍然有一些基本的问题。我有一个流,它发出一个项目序列,我想把流分成2个流。这是我在RxJava中编写它的方式:
val list = Flowable.just(1..6).share()
val even = list.filter { it % 2 == 0 }.subscribe { println(it) } // 2, 4, 6
val odd = list.filter { it % 2 == 1 }.subscribe { println(it) } // 1, 3, 5
我如何用Kotlin协程流来复制这个?提前感谢。
3条答案
按热度按时间jtw3ybtb1#
一系列sharing operators(以及a hot
SharedFlow
)正在简化您正在寻找的工作流(使用KotlinFlows)。与此同时,流在本质上确实是冷的(因此你不能真正地按原样共享它们),但是它们仍然可以共享一个热源来实现你所需要的。
简而言之,最终结果如下所示:
(This很快就会被
SharedFlow
所取代)。mfpqipee2#
你对Rx所做的事情在 * Kotlinflows* 中是不可能的,因为在你的例子中,
share()
将创建一个hot observable,而Kotlin中的流本质上是cold。你可以使用
Channel
,因为它们在Kotlin中代表热流。我读了一篇关于Cold flows, hot channels的博客文章,来自Roman Elizarov。
pgpifvop3#
只是“玩弄”了一个类似的“挑战”。
我希望以循环方式将(可能有许多)生产者的结果流“拆分”到多个异步消费者。
实现:
为了实现上述目标,我使用了
MutableSharedFlow
,在生产者(和消费者)完成“他们的”流之后,最终不得不取消/完成。我怀疑我的结果是否是最佳的,甚至是一个“好”的解决方案,但由于我花了“重要”的时间在上面(学习),我想我也可以在这里分享:
some.sh
shellscript用作“数据源”: