akka 如果执行元在一段时间内未收到消息,如何将其关闭?

bbuxkriu  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(221)

当前代码如下:

case object LatestMessageSignal

class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {

  override def receive: Receive = {
    case LatestMessageSignal => awaitLatestMessageSignal()
  }

  private def awaitLatestMessageSignal(): Unit = {
    import scala.concurrent.duration._
    context.system.scheduler.scheduleOnce(30.seconds) {
      context.stop(self)
    }
  }

}

当执行元接收到LatestMessageSignal消息时,它将调用awaitLatestMessageSignal()方法,该方法将等待30秒,然后停止执行元。

apeeds0o

apeeds0o1#

您似乎想在30秒不活动后停止演员?如果是这样,您可以使用ActorContext#setReceiveTimeout(Duration)
例如:

case object LatestMessageSignal

class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {

  context.setReceiveTimeout(30.seconds)

  override def receive: Receive = {
    case ReceivedTimeout => context.stop(self)
  }

}
yacmzcpb

yacmzcpb2#

据我所知,您希望MessageCheckpoint保持活动状态,如果在30秒内没有新消息到达它,则停止它。
此参与者将保持活动状态,直到您向其发送消息,并将在30秒不活动后停止。

case object LatestMessageSignal

class MessageCheckpoint extends Actor with ActorLogging {

  override def postStop(): Unit = {
    super.postStop()
    log.info("Stopping")
  }

  override def receive: Receive = receiveWithTimer(None)

  private def receiveWithTimer(timer: Option[Cancellable]): Receive = {
    case LatestMessageSignal =>
      timer.foreach(_.cancel())
      context.become(receiveWithTimer(Option(initiateTimer())))
  }

  private def initiateTimer(): Cancellable = {
    import context.dispatcher
    log.info("Initiating new poison pill timer")
    context.system.scheduler.scheduleOnce(30.seconds, self, PoisonPill)
  }
}

我希望actor可以在另一条新消息到来时丢弃当前正在处理的消息,并将处理最新的消息
这是不可能的。我想你是假设方法awaitLatestMessageSignal正在阻塞参与者。这个方法是非阻塞的,它将创建一个计时器并立即返回。消息将被处理得相当快,参与者为下一条消息做好准备。参与者正在一次处理一条消息,没有办法取消消息处理。

相关问题