我试图根据对象的最新版本过滤掉公共元素并返回另一个Source。我的对象看起来像:
case class Record(id: String, version: Long)
我的方法的输入有两个Source:
val sourceA: Source[Record, _] = <>
val sourceB: Source[Record, _] = <>
sourceA
和sourceB
具有Record
对象的公共id
,但两者中的version
可能不同。我想创建一个返回Source[Record, _]
的方法,该方法将具有ID的最新版本。我尝试了
val latestCombinedSource: Source[Record, _] = sourceA map {each => {
sourceB.map(eachB => eachB.version > each.version? eachB: each)
.....
}
}
1条答案
按热度按时间svujldwt1#
您没有提到您所询问的是什么类型的
Source
/什么流库(请更新您的问题以澄清这一点)。从代码中的签名,我假设这是关于akka-stream的。如果这是正确的,那么您可能希望使用zipLatestWith:请注意,还有zipWith,我不能100%确定您要使用哪一个。区别(引自API文档)是:
zipWithLatest
“当所有输入至少有一个元素可用时发出,然后每次元素在任一输入上变为可用时发出”,而zipWith
“当所有输入有一个元素可用时发出”。