更新挂起Kotlin通道缓冲区中数据的自定义函数

o8x7eapl  于 2023-01-17  发布在  Kotlin
关注(0)|答案(1)|浏览(94)

我有一个UNLIMITED大小的缓冲通道,其中发送器比接收器快得多。我希望通过删除旧数据并替换为新数据来更新缓冲区(如果接收器尚未使用它)
下面是我的代码

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

data class Item(val id: Int, val value: Int)
val testData = listOf(
    Item(1, 10),
    Item(2, 24),
    Item(3, 12),
    Item(1, 17), // This one should replace the Item(1, 10) if it's not yet consumed
    Item(4, 16),
    Item(2, 32), // This one should replace the Item(2, 24) if it's not yet consumed
)

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Item>(Channel.UNLIMITED)

    launch {
        for(item in testData) {
            delay(50)
            println("Producing item $item")
            channel.send(item)
        }
    }

    // As you can see the sender already sent all the testData and they are waiting in the buffer to be consumed by the receiver.
    // I would like to do some checks whenever new item is added to the buffer 
    // if(itemInBuffer.id == newItem.id && itemInBuffer.value < newItem.value) then replace it with newItem
    
    launch {
        for (item in channel) {
            delay(5000)
            println(item.toString())
        }
    }
}

**是否有任何Kotlin构建的函数可以接受一些自定义条件并从缓冲区中删除项?**我看到流中有一个名为distinctUntilChangedBy的函数,它可以根据自定义键选择器删除重复数据。Channel是否有类似的功能或是否可以使用ChannelFlow实现它(注意:在我的真实的代码中,事件来自一些网络调用,因此我不确定channelFlow是否适合)

xvw2m8pv

xvw2m8pv1#

这并不像听起来那么简单,我们不能访问通道队列来修改它的内容,而且,即使我们可以,也不容易找到一个具有相同id的项,我们必须迭代整个队列。distinctUntilChangedBy()是一个非常不同的情况,因为它只比较最后一项,而不是遍历整个队列。
我认为我们最好的选择是不使用通道提供的队列,而是自己在Map中存储数据,并且只提供发送和接收功能。我将其实现为一个类似流的操作符,并且使其通用,因此它可以用于其他类似的情况:

context(CoroutineScope)
fun <T : Any, K> ReceiveChannel<T>.groupingReduce(keySelector: (T) -> K, reduce: (T, T) -> T): ReceiveChannel<T> = produce {
    val items = mutableMapOf<K, T>()
    while (!isClosedForReceive) {
        select<Unit> {
            if (items.isNotEmpty()) {
                val (key, item) = items.entries.first()
                onSend(item) {
                    items.remove(key)
                }
            }
            onReceiveCatching { result ->
                val item = result.getOrElse { return@onReceiveCatching }
                items.merge(keySelector(item), item, reduce)
            }
        }
    }
    items.values.forEach { send(it) }
}

它将数据保存在map中,尝试同时发送和接收,无论哪个先完成。如果接收到一个项,并且键已经在map中,它允许以调用者提供的方式合并两个值。它按照项在源通道中第一次出现的顺序发送项,因此相同键的新值不会将该项推回到队列中的最后一个位置。
这就是我们如何在您提供的示例中使用它。我对它做了一些修改,因为您的版本让我感到困惑。它在生成(1, 17)之前消耗(1, 10),所以实际上示例是不正确的。此外,生产者和消费者不会同时运行,因此并发启动它们并添加延迟不会有太大变化:

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Item>(Channel.UNLIMITED)
    val channel2 = channel.groupingReduce(
        keySelector = { it.id },
        reduce = { it1, it2 -> if (it1.value > it2.value) it1 else it2 }
    )

    for(item in testData) {
        println("Producing item $item")
        channel.send(item)
    }
    channel.close()

    // Needed because while using `UNLIMITED` sending is almost immediate,
    // so it actually starts consuming at the same time it is producing.
    delay(100)

    for (item in channel2) {
        println(item.toString())
    }
}

我创建了另一个生产者和消费者实际上并发运行的例子,每100ms生产一个项目,每200ms消费一个项目,初始延迟为50ms。

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Item>(Channel.UNLIMITED)
    val channel2 = channel.groupingReduce(
        keySelector = { it.id },
        reduce = { it1, it2 -> if (it1.value > it2.value) it1 else it2 }
    )

    launch {
        delay(50)
        for (item in channel2) {
            println(item.toString())
            delay(200)
        }
    }

    launch {
        listOf(
            Item(1, 10),
            // consume: 1, 10
            Item(2, 20),
            Item(1, 30),
            // consume: 2, 20
            Item(3, 40),
            Item(1, 50),
            // consume: 1, 50
            Item(4, 60),
            Item(1, 70),
            // consume: 3, 40
            Item(5, 80),
            // consume: 4, 60
            // consume: 1, 70
            // consume: 5, 80
        ).forEach {
            channel.send(it)
            delay(100)
        }
        channel.close()
    }
}

也许有一个更好的方法来解决这个问题。而且,老实说,我不是100%肯定这个代码是正确的。也许我错过了一些角落的情况下,频道关闭,取消或失败。此外,我不知道是否select { onSend() }保证,如果代码块没有被执行,那么该项目没有被发送。如果我们取消send(),我们不能保证该商品没有被发送。在这种情况下可能是相同的。

相关问题