我有一个例子,有一个对象在随机时间内发布对象,我想把它以每秒的速度收集到缓冲区中,并通过一些策略(如max score)进行过滤,以确保每秒缓冲区中只有一个对象。
subject
.buffer(1L, TimeUnit.SECONDS)
.filter {
isNotEmpty
}
.doOnNext {
// I get all object in the one second
// That waste too much memory, the non-max object shouldn't be put into the buffer
_.asScala.max(byScore)
}
.ignoreElements
.subscribeOn(Schedulers.io)
.subscribe
此代码将在一秒钟内保存所有对象并返回给我。
这不是我想要的。
有什么解决办法吗?
1条答案
按热度按时间oxalkeyp1#
您可以使用以下版本的
buffer
操作员:它允许你定义你的习惯
bufferSupplier
-用于存储缓冲值的集合。然后,您可以创建集合的自定义版本,在其中最多存储一个项目,在我们的示例中,如果新的、更大的,则替换现有值:演示,如何在一些模拟数据上使用它(每400毫秒发射一次项):
最后,一些验证: