akka 溪流:如何使用Flow形成Graph的入口和出口

zkure5ic  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(141)

我有一些代码,类似于以下内容:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val balance = builder.add(Balance[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left + right
      }))

      val flow3 = Flow[Int].map(_*2)

      input ~> buffer ~> balance.in
      balance.out(0) ~> flow1 ~> zip.in0
      balance.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3

      FlowShape(input.in, flow3) //Question 2) how to make an outlet here
    })
}

注意,我必须添加一个名为inputBalance,因为我无法从我想要创建的FlowShape的第一个Buffer中检索到Inlet。有没有其他更简单的方法来解决这个问题?用1个Outlet创建一个Balance似乎是错误的方法。
我的第二个问题是类似的。我不能从flow3中检索Outlet。我知道的解决这个问题的唯一方法是创建另一个Balance,并将其Outlet公开为整个FlowShapeOutlet。有更好的方法来解决这个问题吗?

41ik7eoe

41ik7eoe1#

Balance是一个扇出形状,它会发出到第一个可用的输出。考虑到您要在下一步压缩流,您需要的是一个Broadcast。当所有输出都可用时,它会扇出到所有输出。
另外,构建器可以添加任何Graph形状,包括Flow。你不必为此使用自定义形状。
更新后的代码:

object Test extends App {
  val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
      val input = builder.add(buffer) 
      val broadcast = builder.add(Broadcast[Int](2))

      val flow1 = Flow[Int].map(_*2)
      val flow2 = Flow[Int].map(_*2)

      val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
        left + right
      }))

      val flow3 = builder.add(Flow[Int].map(_*2))

      input ~> broadcast.in
      broadcast.out(0) ~> flow1 ~> zip.in0
      broadcast.out(1) ~> flow2 ~> zip.in1
      zip.out ~> flow3.in

      FlowShape(input.in, flow3.out) 
    })
}

相关问题