Akka流并行性

zujrkrfu  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(204)

根据文档[1],我一直在尝试并行化Akka Stream中的流,但由于某种原因,我没有得到预期的结果。
我遵循了文档中列出的步骤,我不认为我遗漏了任何东西。然而,我的流的计算都是按顺序一个接一个地发生的。
我错过了什么?
[1][https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html](https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html)

import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}

object ScalaParallell extends App {

  implicit val system = ActorSystem("QuickStart")

  def longRunningComputation(x: Int): Int = {
    println(s"Computing 1 ${x}")
    Thread.sleep(10000)
    println(s"Computation 1 ${x} done")
    x
  }
  def longRunningComputation2(x: Int): Int = {
    println(s"Computing 2 ${x}")
    Thread.sleep(10000)
    println(s"Computation 2 ${x} done")
    x
  }

  val processor: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      // prepare graph elements
      val balance = b.add(Balance[Int](2))
      val merge = b.add(Merge[Int](2))
      val f = Flow[Int].map(longRunningComputation)
      val f2 = Flow[Int].map(longRunningComputation2)

      // connect the graph
      balance.out(0) ~> f.async ~> merge.in(0)
      balance.out(1) ~> f2.async ~> merge.in(1)

      // expose ports
      FlowShape(balance.in, merge.out)
    })

  // Wire it all up.
  val xs = List(1,2,3)
  val source: Source[Int, NotUsed] = Source(xs)
  source.via(processor).runForeach(println)

  Thread.sleep(5000)
}

输出示例

Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3

我希望看到两个计算同时发生。例如:

Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..
ukqbszuj

ukqbszuj1#

尝试删除longRunningComputationlongRunningComputation2中的Thread.sleep,并将xs设置为更长的值,例如1 to 100,然后您将能够观察到并行处理。不知道为什么,但阻塞Thread.sleep在akka中肯定被认为是反模式

相关问题