Akka -为什么带有回退监督的隐藏消息会丢失?

7rtdyuoh  于 2023-01-13  发布在  其他
关注(0)|答案(2)|浏览(156)

问题

我似乎观察到了这样一个场景:在使用Akka回退监督策略的重启过程中,为类型化的受监督参与者隐藏的消息丢失。
这是预期的行为吗?如果不是,我如何实现以确保这些隐藏的消息被保留?

设置

我创建了一个类型化的监督演员

BackoffSupervisorStrategy backoff = SupervisorStrategy
        .restartWithBackoff(Duration.ofMillis(10), Duration.ofMillis(10000), 0)
        .withStashCapacity(2000);
    return Behaviors.supervise(Behaviors.setup(MyActor::new)).onFailure(Throwable.class, backoff);

它处理一个命令ForceFail,该命令将生成一个RuntimeException,以便我们可以让Akka管理程序执行它的操作。

private Behavior<Command> forceFail(ForceFail command) {
    getContext().getLog().info("Got fail command: {}", command.name);
    throw new RuntimeException(command.name);
  }

生成演员后,我发送一系列的tell

testSystem.tell(new ForceFail("first fail"));
testSystem.tell(new ForceFail("second fail"));
testSystem.tell(new ForceFail("third fail"));

每个tell都会在actor中导致一个异常,触发管理程序重新启动。在重新启动过程中,在管理程序释放消息之前,我检查了StashBuffer的大小。
我看到的是,在第一次重新启动时,StashBuffer显示的大小为2,正如所预期的那样,但是,在第二次重新启动时,第二条消息的大小为0,而我预期它为1。

我没有看到最后一条消息被发送给死信演员。它似乎丢失了,没有日志记录描述它发生了什么。

注解

在Akka内部代码中,StashBuffer unstashAll()方法被调用。
如果处理消息时引发异常,则正在处理的消息和导致异常的消息已从StashBuffer中删除,但未处理的消息仍保留。
这个措辞看起来有点滑稽,但是它所说的是它将顺序地处理stash中的消息,直到它处理完所有消息或者遇到异常。未处理的消息保留在stash中。但这似乎不是我所观察到的。
我用的是 akka 2.7.0版。

iq3niunx

iq3niunx1#

您可以将执行元嵌入到路由器中,以便邮箱生命周期与执行元生命周期分离,并且在重新启动执行元时消息不会丢失。

object PrintActor {

  sealed trait Message

  final case class PrintMessage(value: String) extends Message
  final case class FailMessage(value: String) extends Message

  def apply(): Behavior[Message] =
    Behaviors.receive { (context, message) =>
      message match {
        case PrintMessage(value) =>
          println(s"Print Message :: ${value}")
          Behaviors.same
        case FailMessage(value) =>
          println(s"Fail Message :: ${value}")
          throw new RuntimeException(value)
      }

    }
}
object Main {

  def createActors(): Behavior[Unit] =
    Behaviors.setup[Unit] { ctx =>
      val pool = Routers.pool(poolSize = 1) {
        // make sure the workers are restarted if they fail
        Behaviors.supervise(PrintActor()).onFailure[Exception](SupervisorStrategy.restart)
      }

      val router = ctx.spawn(pool, "printer-pool")

      (1 to 10).foreach { n =>
        if (n % 2 == 1)
          router ! PrintActor.PrintMessage(s"Print $n")
        else
          router ! PrintActor.FailMessage(s"Fail $n")
      }

      Behaviors.empty
    }

  def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply[Unit](createActors(), "system")
  }
}

此模式将确保不会丢失任何消息。

Print Message :: Print 1
Fail Message :: Fail 2
Print Message :: Print 3
Fail Message :: Fail 4
Print Message :: Print 5
Fail Message :: Fail 6
Print Message :: Print 7
Fail Message :: Fail 8
Print Message :: Print 9
Fail Message :: Fail 10
k97glaaz

k97glaaz2#

不是100%肯定,但我认为可能有一个错误,在监督故障发生,而解除内部隐藏的回退。
我在Akka追踪器中创建了一个问题,以供进一步调查:https://github.com/akka/akka/issues/31814

相关问题