我有一个用例,其中有一个参与者层次结构
parent -> childABC -> workerchild
现在,worker child开始工作,并将其结果发送给其父项(childABC,即parent的子项),该子执行元(childABC)将结果发送回父执行元。我使用pipeTo
并得到死信,这是我的代码parent
演员:
final case object GetFinalValue
class MyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetFinalValue=>
ask(myManageActor,GetValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
childABC
(根据上面给出实施例I)
final case object GetValue
class ManagerMyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetValue=>
ask(myTokenActor,CalculateValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
child
执行元:
final case object CalculateValue
class TokenMyActor2 extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
override def receive: Receive = {
case CalculateValue=>
val future = Future{ "get the string"
}
val bac = future.map{result =>
sender ! result
}//.pipeTo(sender())
case message =>
log.warn("Actor MyActor: Unhandled message received : {}", message)
unhandled(message)
}
}
def main(args: Array[String]): Unit = {
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
val myActor = system.actorOf(Props[MyActor],"myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.map {str =>
log.info ("string is {}",str)
}
以下是日志:
[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
请告诉我我哪里错了,或者pipeTo
不应该这样使用?如果是这样的话,我应该怎么做才能使它工作
2条答案
按热度按时间iqjalb3h1#
不确定这是否是有意的,但
ask(myManageActor,GetValue).pipeTo(sender())
可以实现为forward
。forward
与tell
相同,但它保留了消息的原始发送者。这可以应用于
MyActor
和ManagerMyActor
。对于
TokenMyActor2
,不应使用因为它破坏了akka上下文封装,如文档中所规定
在使用将来的回调(如onComplete)或map(如thenRun或thenApply)时,在参与者内部需要小心避免关闭包含参与者的引用,也就是说,不要从回调中调用方法或访问封闭参与者上的可变状态。这将破坏参与者封装,并可能引入同步错误和竞态条件,因为回调将被并发调度到封闭参与者。不幸的是,目前还没有一种方法可以在编译时检测这些非法访问。执行元和共享可变状态
您应该依赖
Future(???).pipeTo(sender())
,它与sender()
一起使用是安全的。应用这些更改后,代码将按预期运行
第一个
产生
got get the string
。最后要说明的是,我建议不要在actor中使用
ask
模式。ask
的基本功能可以很容易地通过tell
和forward
来实现。而且代码更短,不会因为不断需要implicit val timeout
而过载qzwqbdag2#
在@IvanStanislavciuc的伟大帖子上再加上一点。你已经注意到你在期货中失去了对发件人的引用。一个简单的解决办法是把它放在前面。
这意味着在
MyActor
中更改:转换为:
在
ManagerMyActor
中,更改:转换为:
在
TokenMyActor2
中:代码运行在Scastie。