android 如何将一个Kotlin流分成两个流?

acruukt9  于 2023-04-04  发布在  Android
关注(0)|答案(3)|浏览(203)

我正在尝试学习协程,我仍然有一些基本的问题。我有一个流,它发出一个项目序列,我想把流分成2个流。这是我在RxJava中编写它的方式:

val list = Flowable.just(1..6).share()
    val even = list.filter { it % 2 == 0 }.subscribe { println(it) } // 2, 4, 6
    val odd = list.filter { it % 2 == 1 }.subscribe { println(it) } // 1, 3, 5

我如何用Kotlin协程流来复制这个?提前感谢。

jtw3ybtb

jtw3ybtb1#

一系列sharing operators(以及a hot SharedFlow)正在简化您正在寻找的工作流(使用KotlinFlows)。
与此同时,流在本质上确实是冷的(因此你不能真正地按原样共享它们),但是它们仍然可以共享一个热源来实现你所需要的。
简而言之,最终结果如下所示:

val original: Flow<String> = flowOf("aap", "noot", "mies", "wim", "zus","jet","weide","does")

// create an implicit hot BroadcastChannel, shared between collectors
// so that they each get all elements (which are each produced only once)
val sharedFlow = original.broadcastIn(scope).asFlow()

// create derived cold flows, which will subscribe (on collect) to the
// same hot source (BroadcastChannel)
val flow1 = sharedFlow.filter { it.length == 4 }
val flow2 = sharedFlow.filter { it.length == 3 }.map { it.toUppercase() }

flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }

(This很快就会被SharedFlow所取代)。

mfpqipee

mfpqipee2#

你对Rx所做的事情在 * Kotlinflows* 中是不可能的,因为在你的例子中,share()将创建一个hot observable,而Kotlin中的流本质上是cold
你可以使用Channel,因为它们在Kotlin中代表热流
我读了一篇关于Cold flows, hot channels的博客文章,来自Roman Elizarov

pgpifvop

pgpifvop3#

只是“玩弄”了一个类似的“挑战”。
我希望以循环方式将(可能有许多)生产者的结果流“拆分”到多个异步消费者。
实现:

  • 生产者“在后台”产生流
  • 一个/多个消费者以循环方式消费生产者结果的块(也是“在后台”)
  • 当所有这些发生时,您可以继续“做一些事情”,直到所有数据都被产生和使用

为了实现上述目标,我使用了MutableSharedFlow,在生产者(和消费者)完成“他们的”流之后,最终不得不取消/完成。
我怀疑我的结果是否是最佳的,甚至是一个“好”的解决方案,但由于我花了“重要”的时间在上面(学习),我想我也可以在这里分享:

package scratch

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import okio.*
import kotlin.random.Random

data class Packet(val nr: Int, val cmd: String, val line: String, val isFinal: Boolean = false)
class TerminatedException(message: String, cause: Throwable? = null) : Exception(message, cause)

/** possible multiple producers sending to one MutableSharedFlow</br>
 *  multiple consumers fed round-robin from the one SharedFlow of all producers */
fun main(@Suppress("UNUSED_PARAMETER") args: Array<String>) {
    println("===============================")
    println("====   multipleConsumers() ====")
    println("===============================")
    showcaseMultipleConsumers()

    println("\nmompl...\n")
    Thread.sleep(2000L)

    println("===============================")
    println("=====   singleConsumer() ======")
    println("===============================")
    showcaseSingleConsumer()

    println("main ended.")
}

private fun showcaseMultipleConsumers() {
    runBlocking {

        // launch producer with SharedFlow to emit to
        val producerFlow = MutableSharedFlow<Packet>()
        launch(Dispatchers.IO) {
            producer(name = "producer", cmd = "/Users/hoffi/gitRepos/scratch/scratch/some.sh", producerFlow)
        }

        // launch concurrent consumers with each its own SharedFlow
        val consumerCount = 3
        val mapOfFlows = mutableMapOf<Int, MutableSharedFlow<Packet>>()
        for (i in 1..consumerCount) {
            mapOfFlows[i - 1] = MutableSharedFlow()
            launch(Dispatchers.IO) {
                consume(i.toString(), mapOfFlows[i - 1]!!)
            }
        }
        println("finished launching $consumerCount consumers.")

        // round-robin emit from the producerFlow to the existing MutableSharedFlows
        var i = 0
        try {
            producerFlow.buffer().collect() { packet ->
                if (packet.isFinal) {
                    println("producer: final packet received"); throw TerminatedException("'producer' terminated")
                }
                mapOfFlows[i++ % consumerCount]!!.emit(packet)
            }
        } catch (e: TerminatedException) {
            println(e.message)
        }

        println("round-robbin load-balancing finished.")

        // finally send terminal packet to all consumer's MutableSharedFlows
        for (flow in mapOfFlows.values) {
            flow.emit(Packet(-1, "final", "final", true))
        }

        // might do something here _after_ the process has finished and its output is load-balanced to consumers
        // but consuming of process output has not finished yet.

        println("end of runBlocking ...")
    }
}

/** coroutine consuming from given Flow (which is a MutableSharedFlow!) */
suspend fun consume(name: String, flow: Flow<Packet>) {
    try {
        flow.buffer().collect { packet ->
            if (packet.isFinal)  { println("$name: final packet received") ; throw TerminatedException("'$name' terminated") }
            println("%5d in c%s: %s".format(packet.nr, name, packet.line))
            delay(Random.nextLong(50L, 550L))
        }
    } catch(e: TerminatedException) {
        println("consumer: ${e.message}")
    }
}

/** coroutine emitting to given producer's MutableSharedFlow */
suspend fun producer(name: String, cmd: String, producerFlow: MutableSharedFlow<Packet>) {
    val process = ProcessBuilder("\\s".toRegex().split(cmd))
        .redirectOutput(ProcessBuilder.Redirect.PIPE)
        .redirectError(ProcessBuilder.Redirect.PIPE)
        .redirectErrorStream(true)
        .start() // non-blocking asynchronous start process in the background

    val inputBuffer = process.inputStream.source().buffer()
    var i = 0
    while (true) {
        val line = inputBuffer.readUtf8Line() ?: break
        producerFlow.emit(Packet(++i, cmd, line))
    }

    producerFlow.emit(Packet(-1, "final", "final", true))
    println("producer function ended")
}

// =====================================================================================================================
// =====================================================================================================================
// =====================================================================================================================

private fun showcaseSingleConsumer() {
    runBlocking {

        val flow: Flow<Packet> = singleProducer(name = "producer", cmd = "/Users/hoffi/gitRepos/scratch/scratch/some.sh")
        launch(Dispatchers.IO) {
            singleConsumer(name = "consumer", flow)
        }

        // do something here, while the process is executed
        // and the consumer is consuming the process's output

        println("end of runBlocking ...")

    }
}

/** no suspend needed as a flow { ... } implicitly "runs in a coroutine" */
fun singleProducer(name: String, cmd: String) = flow {
    val process = ProcessBuilder("\\s".toRegex().split(cmd))
        .redirectOutput(ProcessBuilder.Redirect.PIPE)
        .redirectError(ProcessBuilder.Redirect.PIPE)
        .redirectErrorStream(true)
        .start() // non-blocking asynchronous start process in the background

    val inputBuffer = process.inputStream.source().buffer()
    var i = 0
    while (true) {
        val line = inputBuffer.readUtf8Line() ?: break
        emit(Packet(++i, cmd, line))
    }

    println("producer function ended")
}

suspend fun singleConsumer(name: String, flow: Flow<Packet>) {
    flow.buffer().collect { packet ->
        println("%5d in c%s: %s".format(packet.nr, name, packet.line))
        delay(Random.nextLong(30L, 150L))
    }
}

some.sh shellscript用作“数据源”:

#!/bin/bash
 # some.sh
echo "first line"
echo -e "second line\nand third line"
echo -n "fourth line without newline"
sleep 2
echo
echo -e "fifth line after sleep\nand sixth line"
echo -e "some stderr\nand stderr has two lines" >&2
for i in {1..25}; do
  echo "and loop line $i"
done
echo -n "last line"

相关问题