我们有一个可以处理事件的Sink
:
def parseEvent(): Sink[T, Future[akka.Done]] = {
Sink.foreach[T] { event => {
// Do stuff with the event
}}
}
对于单个Source
,这可以很好地工作:
val mySource: Source[T] = ...
mySource.takeWhile( someCheck, true ).runWith(parseEvent)
如果您有以下设备,您如何让它工作:
val mySources: Seq[Source[T]] = ...
所有源都应并行运行,并且所有事件都应达到parseEvent
。
1条答案
按热度按时间hkmswyz61#
以下内容应该符合要求:
合并策略“合并多个流,当它们从输入流到达时获取元素”,并随机选择多个流是否有可用的元素。