在Scala中使用属性迭代两个Source和filter

wqsoz72f  于 2022-12-13  发布在  Scala
关注(0)|答案(1)|浏览(128)

我试图根据对象的最新版本过滤掉公共元素并返回另一个Source。我的对象看起来像:

case class Record(id: String, version: Long)

我的方法的输入有两个Source:

val sourceA: Source[Record, _] = <>
val sourceB: Source[Record, _] = <>

sourceAsourceB具有Record对象的公共id,但两者中的version可能不同。我想创建一个返回Source[Record, _]的方法,该方法将具有ID的最新版本。我尝试了

val latestCombinedSource: Source[Record, _] = sourceA map {each => {
               sourceB.map(eachB => eachB.version > each.version? eachB: each)
               .....
}
}
svujldwt

svujldwt1#

您没有提到您所询问的是什么类型的Source/什么流库(请更新您的问题以澄清这一点)。从代码中的签名,我假设这是关于akka-stream的。如果这是正确的,那么您可能希望使用zipLatestWith:

val latestCombinedSource: Source[Record, _] =
  sourceA.zipLatestWith(sourceB) { (a, b) =>
    if (a.version > b.version) a else b
  }

请注意,还有zipWith,我不能100%确定您要使用哪一个。区别(引自API文档)是:zipWithLatest“当所有输入至少有一个元素可用时发出,然后每次元素在任一输入上变为可用时发出”,而zipWith“当所有输入有一个元素可用时发出”。

相关问题