在Kotlin流中取消后发出

llmtgqce  于 2023-06-06  发布在  Kotlin
关注(0)|答案(2)|浏览(725)

我有一个更复杂的版本的以下代码:

val testFlow = flow {
    try {
        // Continually emit some values
    } catch (e: CancellationException) {
        // Wrap up and emit finished state value
    }
}

当我收集这个testFlow时,我从未收到finished状态值。据记录,流构建器在发出值之前执行ensureActive检查。在此链接中也有一个使用IntRange.asFlow的示例,该示例在发出值之前不检查取消,并显示继续收集这些值。
有没有一种方法可以让自定义的Kotlin流在取消协程后发出最后一个要收集的值?
这里有一个Kotlin Playground example(感谢@Tenfour04),显示了根据流的构造方式,在取消后发出的不同行为。我只是不知道如何获得一个在cancel之后能够发出的流,它不是由IntRange.asFlow构造的。

5n0oy7gb

5n0oy7gb1#

IntRange.asFlow在内部使用unsafeFlow,其定义为:

inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

使用unsafeFlow,即使在已取消的协程中也可以发出,尽管这个解决方案是相当黑客的,我想用一个更官方支持的版本来代替它,它不需要访问内部的Kotlin协程API。
注意:取消后发射仅在使用

try {
    ...
} catch (e: CancellationException) {
    ...
}

不使用unsafeFlow.catch {}

mbyulnm0

mbyulnm02#

我也在想这个问题。我认为一种方法是使用StateFlow并单独生成值。这样,您就可以在取消作业之前调用.value(如果在取消作业之后调用它,它也可以工作)。

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main(): Unit = runBlocking{
    val startTime = System.currentTimeMillis()
    val elapsedTime = MutableStateFlow(0L)
    // generate values
    val job = launch {
        while (true){
            elapsedTime.value = System.currentTimeMillis() - startTime
            delay(1000)
        }
    }
    // collect values
    launch {
        elapsedTime.collect{ println("$it milliseconds") }
    }
    // cancel job
    delay(2222)
    println("CANCELLING")
    elapsedTime.value = System.currentTimeMillis() - startTime // emit before cancelling
    job.cancel()
    println("CANCELLED")
}
25 milliseconds
1027 milliseconds
2030 milliseconds
CANCELLING
CANCELLED
2251 milliseconds

实际上,顺序并不重要,因为只有生成值的作业会被取消,而收集器不会。
也许MutableSharedFlow会很有用。

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main(): Unit = runBlocking{
    // val flow1 = (1..10).asFlow()
    val range = (1..10)
    val flow1 = MutableSharedFlow<Int>(replay = range.count()) // since it is "hot", might need to replay values to new subscribers
    // generate values
    val job = launch {
        range.forEach{
            flow1.emit(it)
            delay(1000) // for testing, otherwise it would be complete before the collecting starts
        }
    }
    // collect values
    launch {
        flow1.collect{
            println(it)
            if(it == 3){
                job.cancel()
                flow1.emit(4)
            }
        }
    }
}
1
2
3
4

相关问题