在两个参与者之间使用pipeTo时得到akka死信

wkyowqbh  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(146)

我有一个用例,其中有一个参与者层次结构

parent -> childABC -> workerchild

现在,worker child开始工作,并将其结果发送给其父项(childABC,即parent的子项),该子执行元(childABC)将结果发送回父执行元。我使用pipeTo并得到死信,这是我的代码
parent演员:

final case object GetFinalValue

class MyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetFinalValue=>
      ask(myManageActor,GetValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

childABC(根据上面给出实施例I)

final case object GetValue

class ManagerMyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetValue=>
      ask(myTokenActor,CalculateValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

child执行元:

final case object CalculateValue

class TokenMyActor2 extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)

  override def receive: Receive = {
    case CalculateValue=>
      val future = Future{ "get the string"
      }
      val bac = future.map{result =>
          sender ! result
      }//.pipeTo(sender())

    case message =>
      log.warn("Actor MyActor: Unhandled message received : {}", message)
      unhandled(message)
  }

}

def main(args: Array[String]): Unit = {
    implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

    val myActor = system.actorOf(Props[MyActor],"myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.map {str =>
      log.info ("string is {}",str)
    }

以下是日志:

[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

请告诉我我哪里错了,或者pipeTo不应该这样使用?如果是这样的话,我应该怎么做才能使它工作

iqjalb3h

iqjalb3h1#

不确定这是否是有意的,但ask(myManageActor,GetValue).pipeTo(sender())可以实现为forward

class MyActor extends Actor {
  lazy val myManageActor: ActorRef = ???

  override def receive: Receive = {
    case GetFinalValue =>
      myManageActor.forward(GetValue)
  }
}

forwardtell相同,但它保留了消息的原始发送者。
这可以应用于MyActorManagerMyActor
对于TokenMyActor2,不应使用

future.map{ result =>
          sender ! result
      }

因为它破坏了akka上下文封装,如文档中所规定
在使用将来的回调(如onComplete)或map(如thenRun或thenApply)时,在参与者内部需要小心避免关闭包含参与者的引用,也就是说,不要从回调中调用方法或访问封闭参与者上的可变状态。这将破坏参与者封装,并可能引入同步错误和竞态条件,因为回调将被并发调度到封闭参与者。不幸的是,目前还没有一种方法可以在编译时检测这些非法访问。执行元和共享可变状态
您应该依赖Future(???).pipeTo(sender()),它与sender()一起使用是安全的。
应用这些更改后,代码将按预期运行
第一个
产生got get the string
最后要说明的是,我建议不要在actor中使用ask模式。ask的基本功能可以很容易地通过tellforward来实现。而且代码更短,不会因为不断需要implicit val timeout而过载

qzwqbdag

qzwqbdag2#

在@IvanStanislavciuc的伟大帖子上再加上一点。你已经注意到你在期货中失去了对发件人的引用。一个简单的解决办法是把它放在前面。
这意味着在MyActor中更改:

ask(myManageActor,GetValue).pipeTo(sender()) // Won't work

转换为:

val originalSender = sender()
ask(myTokenActor,CalculateValue).pipeTo(originalSender)

ManagerMyActor中,更改:

ask(myTokenActor,CalculateValue).pipeTo(sender()) // Won't work

转换为:

val originalSender = sender()
ask(myManageActor,GetValue).pipeTo(originalSender)

TokenMyActor2中:

val originalSender = sender()
Future{ "get the string" }.pipeTo(originalSender)

代码运行在Scastie

相关问题