Akka流-分区并行处理

3phpmpom  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(134)

我正在寻找一种方法来实现/使用扇出,它采取1输入,并广播到N个输出并行,不同的是,我想划分他们。

**示例:**1个输入可以发送到4个不同的输出,其他输入可以发送到2个其他输出,取决于某些函数f

source ~> partitionWithBroadcast // Outputs to some subset of [0,3] outputs
partitionWithBroadcast(0) ~> ...
partitionWithBroadcast(1) ~> ...
partitionWithBroadcast(2) ~> ...
partitionWithBroadcast(3) ~> ...

我在Akka文档中进行了搜索,但未找到任何合适的流程
有什么主意吗?

pbpqsu0x

pbpqsu0x1#

我想到的是一个FanOutShape,每个输出都连接了过滤器。**注意:**我没有使用标准的Partition operator,因为它只发射到1个输出。这个问题要求发射到任何连接的输出。例如:

def createPartial[E](partitioner: E => Set[Int]) = {
  GraphDSL.create[FanOutShape4[E,E,E,E,E]]() { implicit builder =>
    import GraphDSL.Implicits._

    val flow = builder.add(Flow.fromFunction((e: E) => (e, partitioner(e))))
    val broadcast = builder.add(Broadcast[(E, Set[Int])](4))

    val flow0 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(0)).map(_._1))
    val flow1 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(1)).map(_._1))
    val flow2 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(2)).map(_._1))
    val flow3 = builder.add(Flow[(E, Set[Int])].filter(_._2.contains(3)).map(_._1))

    flow.out ~> broadcast.in
    broadcast.out(0) ~> flow0.in
    broadcast.out(1) ~> flow1.in
    broadcast.out(2) ~> flow2.in
    broadcast.out(3) ~> flow3.in

    new FanOutShape4[E,E,E,E,E](flow.in, flow0.out, flow1.out, flow2.out, flow3.out)
  }
}

partitioner是一个函数,它将上游的一个元素Map到一个元组中,元组中包含该元素和一组将激活相应输出的整数。图计算所需的划分,然后广播元组。连接到Broadcast的每个输出的流选择划分器分配给该输出的元素。
然后使用它,例如:

implicit val system: ActorSystem = ActorSystem()
implicit val ec = system.dispatcher

def partitioner(s: String) = (0 to 3).filter(s(_) == '*').toSet

val src = Source(immutable.Seq("*__*", "**__", "__**", "_*__"))

val sink0 = Sink.seq[String]
val sink1 = Sink.seq[String]
val sink2 = Sink.seq[String]
val sink3 = Sink.seq[String]

def toFutureTuple[X](f0: Future[X], f1: Future[X], f2: Future[X], f3: Future[X]) = f0.zip(f1).zip(f2).map(t => (t._1._1,t._1._2,t._2)).zip(f3).map(t => (t._1._1,t._1._2,t._1._3,t._2))

val g = RunnableGraph.fromGraph(GraphDSL.create(src, sink0, sink1, sink2, sink3)((_,f0,f1,f2,f3) => toFutureTuple(f0,f1,f2,f3)) { implicit builder =>
  (in, o0, o1, o2, o3) => {
    import GraphDSL.Implicits._

    val part = builder.add(createPartial(partitioner))

    in ~> part.in
    part.out0 ~> o0
    part.out1 ~> o1
    part.out2 ~> o2
    part.out3 ~> o3

    ClosedShape
  }
})

val result = Await.result(g.run(), 10.seconds)
println("0: " + result._1.mkString(" "))
println("1: " + result._2.mkString(" "))
println("2: " + result._3.mkString(" "))
println("3: " + result._4.mkString(" "))

// Prints:
//
// 0: *__***__
// 1:**__ _*__
// 2: __**
// 3: *__* __**
h4cxqtbf

h4cxqtbf2#

首先,实现您的函数以创建分区:

def partitionFunction4[A](func: A => Int)(implicit builder: GraphDSL.Builder[NotUsed]) = {
    // partition with 4 output ports
    builder.add(Partition[A](4, inputElement => func(inputElement)))
  }

然后,创建另一个函数来创建一个带有log函数的Sink,该函数将用于在控制台中打印元素:

def stream[A](log: A => Unit) = Flow.fromFunction[A, A](el => {
      log(el)
      el
    } ).to(Sink.ignore)

连接**graph* 函数中的所有元素:

def graph[A](src: Source[A, NotUsed])
          (func4: A => Int, log: Int => A => Unit) = {

  RunnableGraph
    .fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val partition4 = partitionFunction4(func4)

      /**Four sinks**/
      val flowSet0 = (0 to 4).map(in => log(in))

      src ~> partition4.in

      partition4.out(0) ~> stream(flowSet0(0))
      partition4.out(1) ~> stream(flowSet0(1))
      partition4.out(2) ~> stream(flowSet0(2))
      partition4.out(3) ~> stream(flowSet0(3))

      ClosedShape
    })
    .run()
}

创建一个发出五个Int元素的源。用于创建分区的函数为“element % 4"。根据此函数的结果,元素将被重定向到特定源:

val source1: Source[Int, NotUsed] = Source(0 to 4)

graph[Int](source1)(f1 => f1 % 4,
  in => {
    el =>
      println(s"Stream ${in} element ${el}")
  })

结果获得:

Stream 0 element 0
Stream 1 element 1
Stream 2 element 2
Stream 3 element 3
Stream 0 element 4

相关问题