akka 从Behaviors.setup中返回Behaviors.same的效果是什么?

iezvtpos  于 2023-08-05  发布在  其他
关注(0)|答案(3)|浏览(154)

akka 的documentation说:
第一个月
从消息处理中返回此行为,以便建议系统重用以前的行为。这是为了避免在不需要重新建立目前行为的情况下,重新建立目前行为所造成的配置负担。
def setup[T](factory: (ActorContext[T]) => Behavior[T]): Behavior[T]
setup是一个行为工厂。与Behaviors.receive相反,Behaviors.receive会在执行元执行之前立即建立行为执行严修。工厂函数将ActorContext作为参数传递,例如,可用于生成子参与者。
setup通常在生成参与者时用作最外层行为,但在处理消息或信号时也可以作为下一个行为返回。在这种情况下,它将在返回后立即启动,即下一条消息将由已启动的行为处理。
我曾试图构建一个具有以下监护人行为的行动者系统:

public static Behavior<SayHello> create() {
    return Behaviors.setup(ctx -> {
        System.out.println(ctx.getSelf().path() + ": returning same()");
        return Behaviors.same();
    });
}

字符串
我以为Akka会递归地应用所讨论的行为。换句话说,我希望Akka能够产生这样的无限输出:

akka://helloakka/user: returning same
akka://helloakka/user: returning same
akka://helloakka/user: returning same
...


不过,它只打印一次。
这是预期的行为吗?我所提供的行为的实际意义是什么?您是否可以设计一个从setup中返回same的场景?
编辑:我做了另一个实验,在那里我返回命名的行为本身,而不是same。我希望没有什么不同,因为same应该只是重用以前的行为而不是分配一个新的行为的优化。但是,令我惊讶的是,输出实际上是无限的。

public static Behavior<SayHello> create() {
    return Behaviors.setup(ctx -> {
        System.out.println(ctx.getSelf().path() + ": returning create()");
        return create();
    });
}


我错过了什么?

1zmg4dgp

1zmg4dgp1#

Behaviors.setup(在最近的实现中:这在语义上从2.6之前就没有改变)只是akka.actor.typed.internal.BehaviorImpl.DeferredBehavior的工厂(因为您的示例使用的是javadsl,所以我从javadsl开始; scaladsl在这里的引擎盖下是相同的):

// factory is the function from `javadsl.ActorContext[T]` to `Behavior[T]` passed in
BehaviorImpl.DeferredBehavior(ctx => factory.apply(ctx.asJava))

字符串
其中DeferredBehavior是(省略像toString s这样的东西):

object DeferredBehavior {
  def apply[T](factory: scaladsl.ActorContext[T] => Behavior[T]): Behavior[T] =
    new DeferredBehavior[T] {
      def apply(ctx: TypedActorContext[T]): Behavior[T] = factory(ctx.asScala)
    }
}

abstract class DeferredBehavior[T] extends Behavior[T](BehaviorTags.DeferredBehavior) {
  def apply(ctx: TypedActorContext[T]): Behavior[T]
}


请注意,在调用DeferredBehavior::apply之前不会调用factory
当您生成一个具有该行为(DeferredBehavior)的actor时,将生成一个经典actor,它是ActorAdapter的示例。

// _initialBehavior is the DeferredBehavior in this case, omitting `if` checks that follow from this
private var behavior: Behavior[T] = _initialBehavior
def currentBehavior: Behavior[T] = behavior

def preStart(): Unit =
  try {
    // ctx is the typed ActorContext, context is the classic ActorContext
    behavior = Behavior.validateAsInitial(Behavior.start(behavior, ctx))
    if (!Behavior.isAlive(behavior)) context.stop(self)
  } finally ctx.clearMdc()


Behavior.start有效地,对于我们的目的:

if (behavior.isInstanceOf[DeferredBehavior[T]]) {
  Behavior.start(behavior.asInstanceOf[DeferredBehavior[T]].apply(ctx), ctx)
} else behavior


所以现在我们调用factory方法,在本例中,它在执行println后最终返回BehaviorImpl.SameBehavior。这个SameBehavior被传递给validateAsInitial,它抛出一个IllegalArgumentException,因为Behaviors.sameBehaviors.unhandled不是有效的初始行为。这个例外有效地杀死了演员,因为它正在出生(可怕的,我知道)。
另一方面,当你回调到create时,factory将返回另一个具有相同factoryDeferredBehavior,因此它将被重复传递给start;根据用于构建Akka的Scala编译器是否注意到Behavior.start在本例中是尾递归的,这将导致无限循环或堆栈溢出。
一个Behaviors.setup导致一个Behaviors.same只有在你想让一个演员死产的时候才有意义。Behaviors.setup中的副作用仍然会发生,但如果这就是您想要的,为什么不直接执行它们并保存无意义的开销呢?
上述内容在技术上仅适用于正常行为者。守护者行为是特殊的,因为它首先等待来自参与者系统的特殊消息的递送,该消息用信号通知参与者系统准备好了,之后它将行为 Package 在拦截器中,如果行为不再活跃,则拦截器拆除参与者系统(即,拦截器)。行为停止或失败)。在任何时候,行为都不会被验证为初始行为,但是 Package 的行为是如上所述的start ed,它运行Behaviors.setup块一次,并忘记factory
此时的行为是一个裸SameBehavior,它还没有处理过一条消息。如果你向ActorSystem(它是一个ActorRef)发送一条消息,它将被Behaviors.interpret解释,它将找到SameBehavior并抛出,这将是用户角色层次结构的崩溃,但似乎(在我的实验中)不会停止角色系统。

iqxoj9l9

iqxoj9l92#

这是预期的行为。Behaviors as finite state machines可以帮助。
actor类似于类的示例,但不是调用它的方法,你只能通过ActorRef发送消息与它们交互。一个简单的例子是

import akka.actor.typed.ActorSystem

// start the actor system with a behavior that ignores any message of type `String`
val actorSystem = ActorSystem(Behaviors.empty[String], "empty-behavior")

// send a message of type `String` to the `actorRef`
actorSystem ! "hello"

字符串
这里我创建了一个actor系统,它只接受String类型的消息,但正如您在Behaviors.empty的文档中看到的,它将忽略它接收到的任何消息
将每个传入消息视为未处理的行为。
因此,actor可以有一个T类型的Behavior,这意味着它只接受T类型的消息。让我们看一个演员做某事的例子。

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors

case object HelloMessage

// setup the behavior
val behavior = Behaviors.setup[HelloMessage.type] { context =>
  Behaviors.receiveMessage[HelloMessage.type] {
    // the logic applied for the message received
    case HelloMessage =>
      context.log.info("hello message received")
      Behaviors.same
  }
}

// start the actor system
val actorSystem: ActorSystem[HelloMessage.type] = ActorSystem(behavior, "hello-behavior")

// send a message through the actor ref
actorSystem ! HelloMessage
actorSystem ! HelloMessage
actorSystem ! HelloMessage

// the following line will note compile due to the actor system only
// accepts messages of type `HelloMessage`
actorSystem ! "hello"


如果我们执行上面的例子,我们将看到消息hello message received三次,因为我们发送了HelloMessage三次。让我们看另一个例子,但这次我们将返回一个不同的Behavior,而不是使用Behaviors.same

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors

// define messages that will be accepted by our actor
sealed trait Message
case object Ping extends Message
case object Pong extends Message

val behavior = Behaviors.setup[Message] { context =>
  def pingBehavior: Behaviors.Receive[Message] =
    Behaviors.receiveMessage[Message] {
      case Ping =>
        context.log.info("Ping received. Waiting for Pong")
        pongBehavior // return different behavior
      case Pong =>
        context.log.warn("Waiting for a Pong. Don't send Ping")
        Behaviors.same // return same behavior
    }

  def pongBehavior: Behaviors.Receive[Message] =
    Behaviors.receiveMessage[Message] {
      case Ping =>
        context.log.warn("Waiting for a Pong. Don't send Ping")
        Behaviors.same // return same behavior
      case Pong =>
        context.log.info("Pong received. Waiting for Ping")
        pingBehavior // return different behavior
    }
  pingBehavior // initial behavior
}

// start the actor system with the behaviors defined
val actorSystem = ActorSystem(behavior, "ping-pong-behavior")

// send Ping and Pong messages
actorSystem ! Pong
actorSystem ! Ping
actorSystem ! Ping
actorSystem ! Pong


执行此操作的输出将是

WARN - Waiting for a Pong. Don't send Ping
INFO - Ping received. Waiting for Pong
WARN - Waiting for a Pong. Don't send Ping
INFO - Pong received. Waiting for Ping


正如我们已经看到的,我们可以发送消息给一个演员。参与者还可以向其他参与者和自己发送消息。如果一个actor向它自己发送了一条消息,并返回了同样的行为,我们将能够得到你在问题中提到的无限输出。

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors

case object HelloMessage

// setup the behavior
val behavior = Behaviors.setup[HelloMessage.type] { context =>
  Behaviors.receiveMessage[HelloMessage.type] {
    case HelloMessage =>
      context.log.info("hello message received")
      context.self ! HelloMessage // send a message to itself
      Behaviors.same // return the same behavior
  }
}

val actorSystem: ActorSystem[HelloMessage.type] = ActorSystem(behavior, "infinite-behavior")

actorSystem ! HelloMessage


一旦执行此代码,它将永远开始打印hello message received,直到您手动停止该过程。

34gzjxbg

34gzjxbg3#

Behaviors.setup是用于定义参与者初始行为的方法。当您从Behaviors.setup中返回Behaviors.same时,这意味着参与者应该保持其当前行为,而不是更改它。
same方法返回表示与当前行为相同的行为的行为。当您希望在参与者的设置阶段处理特定消息或执行某些初始化逻辑,但最终希望参与者继续其现有行为时,通常使用它。
从Behaviors.setup返回Behaviors.same具有无限期保持初始行为的效果。在您希望参与者执行某些设置任务但之后保持稳定状态而不转换到其他行为的情况下,这可能很有用。

相关问题