java—转换集合流的内部元素

91zkwejq  于 2021-07-11  发布在  Java
关注(0)|答案(1)|浏览(234)

我最近一直在学习在空闲时间使用akka流(scala和java),并且想知道如何实现下面的场景。
我有一个非常大的集合进入我的管道连续流,我想让管道转换每个集合内的元素。
将集合转换为其元素流很容易,但我还需要将1个集合中所有转换的元素一起收集回1个新集合(仅包含以前在原始集合中也在一起的转换对象)。因此,我必须知道何时处理了1个集合的特定元素流,因为这样我就可以发出转换后的集合,以便在常规管道中进行进一步处理。

uqcuzwp8

uqcuzwp81#

正如评论者所建议的,你可以使用 fold 在你的 transformationPipeline 组合列表类型元素。在运行流时维护列表边界,而不是 mapConcat 使用 flatMapConcat ,如下所示:

def transform(s: String): Int = s.length

val transformationPipeline: Flow[String, List[Int], NotUsed] = Flow[String].
  fold(List.empty[Int])((ls, s) => transform(s) :: ls).
  map(_.reverse)

val flow: Flow[List[String], List[Int], NotUsed] = Flow[List[String]].
  flatMapConcat(Source(_).via(transformationPipeline))

Source(List("a", "bb") :: List("cc", "ddd", "e") :: Nil).
  via(flow).
  runForeach(println)
// List(1, 2)
// List(2, 3, 1)

相关问题