import akka.actor.Actor
import akka.pattern._
import scala.concurrent.Future
import scala.collection.mutable
// Database API and external state model example
case class DbExternalState()
trait Database {
def loadExternalState: Future[DbExternalState]
}
import RestartActor._
class RestartActor(database: Database) extends Actor {
private var state = ActorState()
private val suspendedMessages = mutable.Queue[Any]()
override def receive: Receive = defaultReceive
private def defaultReceive: Receive = {
case Restart => restartActorStart()
}
/**
* Wait until message with internal state received and ignore all the other messages (put back un queue)
*/
private def suspendedReceive: Receive = {
case ExternalStateLoaded(state) => restartActorFinish(state)
case message => suspendedMessages.enqueue(message)
}
private def restartActorStart(): Unit = {
import context.dispatcher
context.become(suspendedReceive)
database.loadExternalState.map(ExternalStateLoaded) pipeTo self
}
private def restartActorFinish(dbExternalState: DbExternalState): Unit = {
state = ActorState.initial(dbExternalState)
context.become(defaultReceive) // Return to normal message handling flow
suspendedMessages.foreach(message => self ! message)
suspendedMessages.clear()
}
}
object RestartActor {
// Restart
case object Restart
case class ExternalStateLoaded(state: DbExternalState)
case class ActorState(internalState: List[String] = Nil, externalState: DbExternalState = DbExternalState())
object ActorState {
def initial(externalState: DbExternalState): ActorState = ActorState(externalState = externalState)
}
}
1条答案
按热度按时间eoxn13cs1#
目的是重置执行元成员变量,从数据库重新加载外部状态
我想,这可能是最大的问题,因为加载外部状态可能需要时间,也会阻塞操作,因此操作的结果是或应该是
Future[]
-所以在将来加载时,您的参与者应该忽略所有其他消息,直到收到来自DB的状态。我认为
ActorCell#become
方法在这种情况下可能会对您有所帮助-因此您可以将receive方法更改为另一个方法,该方法将忽略其余消息(带有DB状态或数据的消息除外),然后切换回常规接收。请参见下面的代码示例:
请让我知道建议是正确的。我希望这有帮助!