如何重新启动一个 akka 演员自己?

7rtdyuoh  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(153)

当子执行元收到自定义RESTART消息时,执行元应重新启动自身。
(The目的是重置执行元成员变量,从数据库重新加载外部状态,但不清除执行元内部消息队列)
要实现重新启动,一种解决方法是子执行元引发一个自定义异常,父执行元配置其OneForOneStrategy以针对此特定异常类型重新启动子执行元。
我想知道,是否有一个更直接的方法来做重启?

eoxn13cs

eoxn13cs1#

目的是重置执行元成员变量,从数据库重新加载外部状态
我想,这可能是最大的问题,因为加载外部状态可能需要时间,也会阻塞操作,因此操作的结果是或应该是Future[]-所以在将来加载时,您的参与者应该忽略所有其他消息,直到收到来自DB的状态。
我认为ActorCell#become方法在这种情况下可能会对您有所帮助-因此您可以将receive方法更改为另一个方法,该方法将忽略其余消息(带有DB状态或数据的消息除外),然后切换回常规接收。
请参见下面的代码示例:

import akka.actor.Actor
  import akka.pattern._
  import scala.concurrent.Future
  import scala.collection.mutable

  // Database API and external state model example
  case class DbExternalState()
  trait Database {
    def loadExternalState: Future[DbExternalState]
  }

  import RestartActor._
  class RestartActor(database: Database) extends Actor {
    private var state = ActorState()
    private val suspendedMessages = mutable.Queue[Any]()

    override def receive: Receive = defaultReceive

    private def defaultReceive: Receive = {
      case Restart => restartActorStart()
    }

    /**
     * Wait until message with internal state received and ignore all the other messages (put back un queue)
     */
    private def suspendedReceive: Receive = {
      case ExternalStateLoaded(state) => restartActorFinish(state)
      case message => suspendedMessages.enqueue(message)
    }

    private def restartActorStart(): Unit = {
      import context.dispatcher
      context.become(suspendedReceive)
      database.loadExternalState.map(ExternalStateLoaded) pipeTo self
    }

    private def restartActorFinish(dbExternalState: DbExternalState): Unit = {
      state = ActorState.initial(dbExternalState)
      context.become(defaultReceive) // Return to normal message handling flow
      suspendedMessages.foreach(message => self ! message)
      suspendedMessages.clear()
    }
  }

  object RestartActor {
    // Restart
    case object Restart
    case class ExternalStateLoaded(state: DbExternalState)

    case class ActorState(internalState: List[String] = Nil, externalState: DbExternalState = DbExternalState())

    object ActorState {
      def initial(externalState: DbExternalState): ActorState = ActorState(externalState = externalState)
    }
  }

请让我知道建议是正确的。我希望这有帮助!

相关问题