我有一些代码,类似于以下内容:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val balance = builder.add(Balance[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left + right
}))
val flow3 = Flow[Int].map(_*2)
input ~> buffer ~> balance.in
balance.out(0) ~> flow1 ~> zip.in0
balance.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3
FlowShape(input.in, flow3) //Question 2) how to make an outlet here
})
}
注意,我必须添加一个名为input
的Balance
,因为我无法从我想要创建的FlowShape
的第一个Buffer
中检索到Inlet
。有没有其他更简单的方法来解决这个问题?用1个Outlet
创建一个Balance
似乎是错误的方法。
我的第二个问题是类似的。我不能从flow3
中检索Outlet
。我知道的解决这个问题的唯一方法是创建另一个Balance
,并将其Outlet
公开为整个FlowShape
的Outlet
。有更好的方法来解决这个问题吗?
1条答案
按热度按时间41ik7eoe1#
Balance
是一个扇出形状,它会发出到第一个可用的输出。考虑到您要在下一步压缩流,您需要的是一个Broadcast
。当所有输出都可用时,它会扇出到所有输出。另外,构建器可以添加任何
Graph
形状,包括Flow
。你不必为此使用自定义形状。更新后的代码: