我是新来的akka流,不知道如何处理这个问题。
我有3个按序列ID排序的源流。我想将具有相同ID的值分组在一起。每个流中的值可能丢失或重复。如果一个流的生成器速度比其他流快,则应对其进行反压。
case class A(id: Int)
case class B(id: Int)
case class C(id: Int)
case class Merged(as: List[A], bs: List[B], cs: List[C])
import akka.stream._
import akka.stream.scaladsl._
val as = Source(List(A(1), A(2), A(3), A(4), A(5)))
val bs = Source(List(B(1), B(2), B(3), B(4), B(5)))
val cs = Source(List(C(1), C(1), C(3), C(4)))
val merged = ???
// value 1: Merged(List(A(1)), List(B(1)), List(C(1), C(1)))
// value 2: Merged(List(A(2)), List(B(2)), Nil)
// value 3: Merged(List(A(3)), List(B(3)), List(C(3)))
// value 4: Merged(List(A(4)), List(B(4)), List(C(4)))
// value 5: Merged(List(A(5)), List(B(5)), Nil)
// (end of stream)
1条答案
按热度按时间vwkv1x7d1#
这个问题很老了,但是我试图找到一个解决方案,我只在lightbend forum遇到了路径的岩石,但是没有一个工作用例。所以我决定实现并在这里发布我的例子。
我创建了3个源
sourceA
、sourceB
和sourceC
,它们使用.throttle()
以不同的速度发出事件。然后我创建了一个RunnableGraph
,其中我使用Merge
合并源,并将输出输出到我的WindowGroupEventFlow
Flow
,我基于事件数量的滑动窗口来实现。如下图所示:我在源代码上使用的类如下:
这是我创建的用于对事件进行分组的
WindowGroupEventFlow
Flow
:这就是我做事方式:
您可以在输出中看到我是如何对具有相同ID的元素进行分组的: