关于Actor系统和Akka流的清洁关闭

kpbwa7wx  于 2023-08-05  发布在  其他
关注(0)|答案(1)|浏览(138)

我们希望在关闭akka流时完成以下任务
1.源被停止,
1.流中的元素在超时点之前完成
1.则流/演员系统被关闭
下面的代码用于关闭actor系统(Akka 2.4)。

try {
      Await.ready(actorSystem.terminate(), sleepSeconds.seconds)
    } catch {
      case ex: Throwable => log.error("Failed to terminate actor system", ex)
    }

字符串
我试图理解akka文档,但我不清楚是否需要单独关闭物化器(对于我们的akka流)?我们在这里有一个关于物化器的方法和一个关于参与者系统的方法。
或者换句话说,我们应该采取什么惯用的操作步骤来干净/优雅地关闭akka流。

wgeznvg7

wgeznvg71#

您必须在参与者系统终止之前关闭流。如果不这样做,可能会丢失内部缓冲区中的一些元素,这些元素可能无法到达最终的Sink
考虑以下akka 2.6的示例(如果某些API与2.4不同,请自行调整)

implicit val actorSystem: ActorSystem = ActorSystem("test")
val source = Source(LazyList.fill(100_000)(Random.nextInt(100)))
val flow = Flow[Int]
  .throttle(3, 1.second)
  .map { s =>
    println(s);
    s
  }
  .grouped(4)
  .map(s => (s.size, s.max))
val stream = source.via(flow).to(Sink.foreach(println))
val _ = stream.run()
Thread.sleep(5_400)
val _ = Await.result(actorSystem.terminate(), 2.seconds)
println("Terminated")

字符串
当它运行时,您可能会看到Actor系统终止之前的最终输出,如下所示

36
84
38
32
(4,84)
29
Terminated


它产生的元素29卡在grouped中,永远不会到达最终的Sink.foreach(println)
这是有可能克服的帮助下,一个死亡开关

implicit val actorSystem: ActorSystem = ActorSystem("test")
val killSwitch = KillSwitches.single[Int]
val source = Source(LazyList.fill(100_000)(Random.nextInt(100))).viaMat(killSwitch)(Keep.right)
val flow = Flow[Int]
  .throttle(3, 1.second)
  .map { s =>
    println(s);
    s
  }
  .grouped(4)
  .map(s => (s.size, s.max))
val stream = source.viaMat(flow)(Keep.left).toMat(Sink.foreach(println))(Keep.both)
val (kill, done) = stream.run()
Thread.sleep(5_400)
kill.shutdown()
val _ = Await.result(done, 2.seconds)
println("Terminated")
val _ = Await.result(actorSystem.terminate(), 2.seconds)


现在最后的输出将是正确的

10
64
77
6
(4,77)
74
24
(2,74)
Terminated


并且输出(2,74)证明grouped被清除。
请注意使用viaMattoMat来捕获两个物化值。(您可以阅读有关物化值here的内容)

  • KillSwitches.single[Int]返回的killSwitch
  • Sink.foreach返回的Future[Done]

KillSwitch只能启动流的终止,但您仍然需要等待流完成。这可以通过Sink中的Future[Done]完成

kill.shutdown()
val _ = Await.result(done, 2.seconds)


这将启动关机,然后等待关机完成。

相关问题