如何处理一系列Akka流源?

4szc88ey  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(250)

我们有一个可以处理事件的Sink

def parseEvent(): Sink[T, Future[akka.Done]] = {
  Sink.foreach[T] { event => {
    // Do stuff with the event
  }}
}

对于单个Source,这可以很好地工作:

val mySource: Source[T] = ...  
mySource.takeWhile( someCheck, true ).runWith(parseEvent)

如果您有以下设备,您如何让它工作:

val mySources: Seq[Source[T]] = ...

所有源都应并行运行,并且所有事件都应达到parseEvent

hkmswyz6

hkmswyz61#

以下内容应该符合要求:

import akka.NotUsed
import akka.stream.scaladsl.{ Concat, Merge, Source }

def sourceFromSources[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] =
  sources.size match {
    case s if s < 1 => Source.empty[T]
    case 1 => sources.head
    case 2 => sources.head.merge(sources(1))
    case _ => Source.combine(sources.head, sources(1), sources.drop(2): _*)(Merge(_))
  }

合并策略“合并多个流,当它们从输入流到达时获取元素”,并随机选择多个流是否有可用的元素。

相关问题