在scala/akka中的计算之间按参与者检查消息查询

jtw3ybtb  于 2022-11-05  发布在  Scala
关注(0)|答案(3)|浏览(167)

我有一个演员,当他收到一个消息,他开始在一个循环中做来计算,他这样做了一段时间(像100倍,他这样做)。现在我需要他对其他消息作出React,可能会尽快。最好的办法是在他的循环中添加一些指令,如“如果有一个消息在队列React,然后返回这里”,但我还没有看到这样的功能。
我认为演员可以发送消息给自己,而不是做一个循环,然后这样的消息将排队在最后,他会对其他人之间的React,但我听说沟通是坏的(比计算更耗时),不知道是否与自我沟通算作这样。
我的问题是,你对这种解决方案有什么看法?你对如何处理计算之间的通信有什么其他想法吗?

avwztpqn

avwztpqn1#

不应在主接收方法中执行耗时的计算,因为这会降低系统的响应速度。请将计算放在阻塞FutureTask或其他异步对象中,并在计算完成时向参与者发送消息。参与者可以在计算在其他线程上继续时尽快继续处理消息。
如果参与者需要在计算运行时(响应消息)修改计算,这会变得更加复杂,但解决方案取决于计算是什么以及需要什么样的修改,因此实际上不可能给予一个通用的答案。

vaj7vani

vaj7vani2#

一般来说,在Akka中,您希望限制“每单位”完成的工作量,在本例中为单位:

  • 处理消息的执行元
  • Future/Task中完成的工作或相同的回调

过长的工作单元会消耗一个线程,从而很容易限制整个系统的响应能力。对于不消耗CPU但被阻塞等待I/O的任务,这些任务可以在不同的线程池中执行,但对于执行一些消耗CPU的工作,这实际上没有帮助。
因此,如果你正在执行一个循环,最广泛的方法是将循环的状态挂起到一个消息中,并将其发送给你自己。这会带来一个小的性能损失(构造消息、将其发送给你自己(保证是本地发送)和解构它的延迟可能是微秒量级的,而系统在其他情况下是空闲的),但可以改善整体系统延迟。
例如,假设我们有一个参与者,他将计算第n个斐波纳契数。我使用Akka Typed实现了这个过程,但在Classic中也应用了广泛的原则:

object Fibonacci {
  sealed trait Command

  case class SumOfFirstN(n: Int, replyTo: ActorRef[Option[Long]]) extends Command

  private object Internal {
    case class Iterate(i: Int, a: Int, b: Int)  extends Command
    val initialIterate = Iterate(1, 0, 1)
  }

  case class State(waiting: SortedMap[Int, Set[ActorRef[Option[Long]]]]) {
    def behavior: Behavior[Command] =
      Behaviors.receive { (context, msg) =>
        msg match {
          case SumOfFirstN(n, replyTo) =>
            if (n < 1) {
              replyTo ! None
              Behaviors.same
            } else {
              if (waiting.isEmpty) {
                context.self ! Internal.initialIterate
              }

              val nextWaiting =
                waiting.updated(n, waiting.get(n).fold(Set(replyTo))(_.incl(replyTo))
              copy(waiting = nextWaiting).behavior
            }

          case Internal.Iterate(i, a, b) =>
            // the ith fibonacci number is b, the (i-1)th is a
            if (waiting.rangeFrom(i).isEmpty) {
              // Nobody waiting for this run to complete
              if (waiting.nonEmpty) {
                context.self ! Internal.initialIterate
              }

              Behaviors.same
            } else {
              var nextWaiting = waiting
              var nextA = a
              var nextB = b

              (1 to 10).foreach { x =>
                val next = nextA + nextB

                nextWaiting.get(x + i).foreach { waiters =>
                  waiters.foreach(_ ! Some(next))
                }

                nextWaiting = nextWaiting.removed(x + i)
                nextA = nextB
                nextB = next
              }

              context.self ! Internal.Iterate(i + 10, nextA, nextB)
              copy(waiting = nextWaiting)
            }
        }
      }
  }
}

注意,对于相同数目的多个请求(如果时间上足够接近)将仅被计算一次,并且对于中间结果的时间上接近的请求将不会导致额外的计算。

wnrlj8wa

wnrlj8wa3#

一个选项是委派任务,例如:Future,并使用一个单独的ExecutionContext,其中的fixed-pool-size(可在application.conf中配置)等于CPU(或内核)的数量,以便使用可用的内核高效地完成计算。
另一个选择是让路由器后面的另一个参与者执行计算,同时将被路由者的数量限制为CPU的数量。
一个简单的例子:

object DelegatingSystem extends App {

  val sys = ActorSystem("DelegatingSystem")

  case class TimeConsuming(i: Int)
  case object Other

  class Worker extends Actor with ActorLogging {

    override def receive: Receive = {
      case message =>
        Thread.sleep(1000)
        log.info(s"$self computed long $message")
    }
  }

  class Delegator extends Actor with ActorLogging {
    //Set the number of routees to be equal to #of cpus
    val router: ActorRef = context.actorOf(RoundRobinPool(2).props(Props[Worker]))

    override def receive: Receive = {
      case message:TimeConsuming => router ! message
      case _ =>
        log.info("process other messages")
    }
  }

  val delegator = sys.actorOf(Props[Delegator])
  delegator ! TimeConsuming(1)
  delegator ! Other
  delegator ! TimeConsuming(2)
  delegator ! Other
  delegator ! TimeConsuming(3)
  delegator ! Other
  delegator ! TimeConsuming(4)
}

相关问题