如何使Akka ActorSystem的一部分在Typed Akka中同步?

jchrr9hc  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(140)

我试图让代码的一部分同步,另一部分异步。我想要实现的是从Actor System中的每个子Actor获取最新的Map。因此,基本上是等待消息的异步调用完成,然后调用同步过程。到目前为止,我还未能实现这种行为(请参阅MasterSite.scala中的TODO)。有没有关于如何在Scala Typed Akka中执行此操作的想法?

  • 主站点.scala*
object MasterSite {
  sealed trait MasterSiteProtocol
  final case class Broadcast(
                              msg: Site.SiteProtocol,
                              from: ActorRef[Site.SiteProtocol],
                              partitionSet: Set[ActorRef[SiteProtocol]]
                            ) extends MasterSiteProtocol

  def apply(): Behavior[MasterSiteProtocol] =
    Behaviors.setup { context =>
     // create/spawn actors (here actors represent Sites)
      val siteA = context.spawn(Site(), "A")
      val siteB = context.spawn(Site(), "B")
      val siteC = context.spawn(Site(), "C")
      val siteD = context.spawn(Site(), "D")

      var init_sitePartitionList: List[Set[ActorRef[SiteProtocol]]] = List(Set(siteA,siteB,siteC,siteD))

      val time_a1 = System.currentTimeMillis().toString

      val partitionSet1 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB, siteC, siteD)
      siteA ! Site.FileUpload(time_a1, context.self, "test.txt", partitionSet1)

      // split into List(Set{A,B}, Set{C,D})
      init_sitePartitionList = splitPartition(init_sitePartitionList, Set(siteA, siteB))

      val partitionSet2 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB)
      siteA ! Site.FileUpdate(("A", time_a1), context.self, partitionSet2)
      siteA ! Site.FileUpdate(("A", time_a1), context.self, partitionSet2)

      init_sitePartitionList = mergePartition(init_sitePartitionList, Set(siteA, siteB, siteC, siteD)) // returns Set(siteA, siteB, siteC, siteD)

      val partitionSet3 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB, siteC, siteD)
      siteA ! Merged(siteC, context.self, partitionSet3) // --> TODO: when code reaches this point, I want that all the message calls before this call are fully executed by the sites and then this message should be sent and handled by `siteA`. How to achieve that?

      Behaviors.receiveMessage {
        case Broadcast(msg: SiteProtocol, from: ActorRef[SiteProtocol], partitionSet: Set[ActorRef[SiteProtocol]]) =>
          partitionSet.foreach { child =>
            if(!child.equals(from)) {
              child ! msg
            }
          }
          Behaviors.same
      }
    }

}
7d7tgy0s

7d7tgy0s1#

一般情况下,没有现成的方法可以做到这一点。
知道消息已被处理的唯一方法是让该消息的收件人知道(或其指定人员)发送消息作为答复。这需要设计一个协议(在本例中为SiteProtocol),然后发送方跟踪它在等待哪些回复。(相关ID或确保您没有与同一参与者的并发交互在这里很有帮助),当收到所有回复时,它就可以继续做下一件事。
类似地,请求-响应协议可以用来收集站点参与者计算的值。注意,对于参与者处理其他消息(并可能改变自己的状态)没有内在的保护,尽管限制ActorRef的传播范围(以及消息类型的通用性)可以限制这一点。
从技术上讲,可以使用非Akka并发方法在参与者之间共享数据:注意,如果采用这种方法,就很大程度上放弃了actor模型的优点(特别是单线程错觉和位置透明性)。

相关问题