问题
我似乎观察到了这样一个场景:在使用Akka回退监督策略的重启过程中,为类型化的受监督参与者隐藏的消息丢失。
这是预期的行为吗?如果不是,我如何实现以确保这些隐藏的消息被保留?
设置
我创建了一个类型化的监督演员
BackoffSupervisorStrategy backoff = SupervisorStrategy
.restartWithBackoff(Duration.ofMillis(10), Duration.ofMillis(10000), 0)
.withStashCapacity(2000);
return Behaviors.supervise(Behaviors.setup(MyActor::new)).onFailure(Throwable.class, backoff);
它处理一个命令ForceFail
,该命令将生成一个RuntimeException
,以便我们可以让Akka管理程序执行它的操作。
private Behavior<Command> forceFail(ForceFail command) {
getContext().getLog().info("Got fail command: {}", command.name);
throw new RuntimeException(command.name);
}
生成演员后,我发送一系列的tell
testSystem.tell(new ForceFail("first fail"));
testSystem.tell(new ForceFail("second fail"));
testSystem.tell(new ForceFail("third fail"));
每个tell
都会在actor中导致一个异常,触发管理程序重新启动。在重新启动过程中,在管理程序释放消息之前,我检查了StashBuffer
的大小。
我看到的是,在第一次重新启动时,StashBuffer
显示的大小为2,正如所预期的那样,但是,在第二次重新启动时,第二条消息的大小为0,而我预期它为1。
我没有看到最后一条消息被发送给死信演员。它似乎丢失了,没有日志记录描述它发生了什么。
注解
在Akka内部代码中,StashBuffer unstashAll()
方法被调用。
如果处理消息时引发异常,则正在处理的消息和导致异常的消息已从StashBuffer中删除,但未处理的消息仍保留。
这个措辞看起来有点滑稽,但是它所说的是它将顺序地处理stash中的消息,直到它处理完所有消息或者遇到异常。未处理的消息保留在stash中。但这似乎不是我所观察到的。
我用的是 akka 2.7.0版。
2条答案
按热度按时间iq3niunx1#
您可以将执行元嵌入到路由器中,以便邮箱生命周期与执行元生命周期分离,并且在重新启动执行元时消息不会丢失。
此模式将确保不会丢失任何消息。
k97glaaz2#
不是100%肯定,但我认为可能有一个错误,在监督故障发生,而解除内部隐藏的回退。
我在Akka追踪器中创建了一个问题,以供进一步调查:https://github.com/akka/akka/issues/31814