我们希望在关闭akka流时完成以下任务
1.源被停止,
1.流中的元素在超时点之前完成
1.则流/演员系统被关闭
下面的代码用于关闭actor系统(Akka 2.4)。
try {
Await.ready(actorSystem.terminate(), sleepSeconds.seconds)
} catch {
case ex: Throwable => log.error("Failed to terminate actor system", ex)
}
字符串
我试图理解akka文档,但我不清楚是否需要单独关闭物化器(对于我们的akka流)?我们在这里有一个关于物化器的方法和一个关于参与者系统的方法。
或者换句话说,我们应该采取什么惯用的操作步骤来干净/优雅地关闭akka流。
1条答案
按热度按时间wgeznvg71#
您必须在参与者系统终止之前关闭流。如果不这样做,可能会丢失内部缓冲区中的一些元素,这些元素可能无法到达最终的
Sink
考虑以下akka 2.6的示例(如果某些API与2.4不同,请自行调整)
字符串
当它运行时,您可能会看到Actor系统终止之前的最终输出,如下所示
型
它产生的元素
29
卡在grouped
中,永远不会到达最终的Sink.foreach(println)
。这是有可能克服的帮助下,一个死亡开关
型
现在最后的输出将是正确的
型
并且输出
(2,74)
证明grouped
被清除。请注意使用
viaMat
和toMat
来捕获两个物化值。(您可以阅读有关物化值here的内容)KillSwitches.single[Int]
返回的killSwitch
Sink.foreach
返回的Future[Done]
KillSwitch
只能启动流的终止,但您仍然需要等待流完成。这可以通过Sink
中的Future[Done]
完成型
这将启动关机,然后等待关机完成。