Akka -如何阻止整个循环而不是每个接收到的消息

ngynwnxp  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(121)

我正在尝试编写一个角色层次结构,它可以并行地查找0到5之间的数字。
下面是我的代码:

import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

class A1 extends Actor {
  override def receive = {
    case "test" => {
      val r = new scala.util.Random
      val r1 = 0 + r.nextInt(( 5 - 0) + 1)
      if(r1 == 1) {
        sender() ! "found"
      }
      else {
        sender() ! "NotFound"
      }

    }
  }
}

object BTest extends App{

  val actorSystem = ActorSystem("firstActorSystem")
  val a1 = actorSystem.actorOf(Props[A1], "A1")

  implicit val timeout = Timeout(5 seconds)

  var answer = ""
  while(answer != "found") {
    answer = Await.result(a1 ? "test", timeout.duration).toString
    Thread.sleep(1000)
    println("answer : " + answer)
  }
}

它根据找到数字1的时间打印以下内容:

answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : NotFound
answer : found

我不确定如何在阻塞的同时并发执行参与者,直到找到数字1。我认为我的解决方案会阻塞,直到使用Await.result接收到每个参与者消息。
如何阻塞整个循环而不是每个接收到的消息,并在找到数字1时解除阻塞?

wydwbb8l

wydwbb8l1#

这个问题的答案可以归结为如何组合多个Future,直到其中一个返回您感兴趣的值。您可能希望在任何给定时间对并发运行的期货数量设置一些上限。
您可以使用Akka Streams来实现它,如下所示:

implicit val materializer: Materializer = Materializer(actorSystem)

val maxConcurrency = 16

val future = 
  Source.repeat(1)
        .mapAsync(maxConcurrency)(_ => a1 ? "test")
        .filter("found" == _)
        .runWith(Sink.head)

Await.result(future , timeout.duration).toString

相关问题