我有一个UNLIMITED
大小的缓冲通道,其中发送器比接收器快得多。我希望通过删除旧数据并替换为新数据来更新缓冲区(如果接收器尚未使用它)
下面是我的代码
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
data class Item(val id: Int, val value: Int)
val testData = listOf(
Item(1, 10),
Item(2, 24),
Item(3, 12),
Item(1, 17), // This one should replace the Item(1, 10) if it's not yet consumed
Item(4, 16),
Item(2, 32), // This one should replace the Item(2, 24) if it's not yet consumed
)
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Item>(Channel.UNLIMITED)
launch {
for(item in testData) {
delay(50)
println("Producing item $item")
channel.send(item)
}
}
// As you can see the sender already sent all the testData and they are waiting in the buffer to be consumed by the receiver.
// I would like to do some checks whenever new item is added to the buffer
// if(itemInBuffer.id == newItem.id && itemInBuffer.value < newItem.value) then replace it with newItem
launch {
for (item in channel) {
delay(5000)
println(item.toString())
}
}
}
**是否有任何Kotlin构建的函数可以接受一些自定义条件并从缓冲区中删除项?**我看到流中有一个名为distinctUntilChangedBy的函数,它可以根据自定义键选择器删除重复数据。Channel
是否有类似的功能或是否可以使用ChannelFlow
实现它(注意:在我的真实的代码中,事件来自一些网络调用,因此我不确定channelFlow
是否适合)
1条答案
按热度按时间xvw2m8pv1#
这并不像听起来那么简单,我们不能访问通道队列来修改它的内容,而且,即使我们可以,也不容易找到一个具有相同id的项,我们必须迭代整个队列。
distinctUntilChangedBy()
是一个非常不同的情况,因为它只比较最后一项,而不是遍历整个队列。我认为我们最好的选择是不使用通道提供的队列,而是自己在Map中存储数据,并且只提供发送和接收功能。我将其实现为一个类似流的操作符,并且使其通用,因此它可以用于其他类似的情况:
它将数据保存在map中,尝试同时发送和接收,无论哪个先完成。如果接收到一个项,并且键已经在map中,它允许以调用者提供的方式合并两个值。它按照项在源通道中第一次出现的顺序发送项,因此相同键的新值不会将该项推回到队列中的最后一个位置。
这就是我们如何在您提供的示例中使用它。我对它做了一些修改,因为您的版本让我感到困惑。它在生成
(1, 17)
之前消耗(1, 10)
,所以实际上示例是不正确的。此外,生产者和消费者不会同时运行,因此并发启动它们并添加延迟不会有太大变化:我创建了另一个生产者和消费者实际上并发运行的例子,每100ms生产一个项目,每200ms消费一个项目,初始延迟为50ms。
也许有一个更好的方法来解决这个问题。而且,老实说,我不是100%肯定这个代码是正确的。也许我错过了一些角落的情况下,频道关闭,取消或失败。此外,我不知道是否
select { onSend() }
保证,如果代码块没有被执行,那么该项目没有被发送。如果我们取消send()
,我们不能保证该商品没有被发送。在这种情况下可能是相同的。