我试图让代码的一部分同步,另一部分异步。我想要实现的是从Actor System中的每个子Actor获取最新的Map
。因此,基本上是等待消息的异步调用完成,然后调用同步过程。到目前为止,我还未能实现这种行为(请参阅MasterSite.scala中的TODO)。有没有关于如何在Scala Typed Akka中执行此操作的想法?
- 主站点.scala*
object MasterSite {
sealed trait MasterSiteProtocol
final case class Broadcast(
msg: Site.SiteProtocol,
from: ActorRef[Site.SiteProtocol],
partitionSet: Set[ActorRef[SiteProtocol]]
) extends MasterSiteProtocol
def apply(): Behavior[MasterSiteProtocol] =
Behaviors.setup { context =>
// create/spawn actors (here actors represent Sites)
val siteA = context.spawn(Site(), "A")
val siteB = context.spawn(Site(), "B")
val siteC = context.spawn(Site(), "C")
val siteD = context.spawn(Site(), "D")
var init_sitePartitionList: List[Set[ActorRef[SiteProtocol]]] = List(Set(siteA,siteB,siteC,siteD))
val time_a1 = System.currentTimeMillis().toString
val partitionSet1 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB, siteC, siteD)
siteA ! Site.FileUpload(time_a1, context.self, "test.txt", partitionSet1)
// split into List(Set{A,B}, Set{C,D})
init_sitePartitionList = splitPartition(init_sitePartitionList, Set(siteA, siteB))
val partitionSet2 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB)
siteA ! Site.FileUpdate(("A", time_a1), context.self, partitionSet2)
siteA ! Site.FileUpdate(("A", time_a1), context.self, partitionSet2)
init_sitePartitionList = mergePartition(init_sitePartitionList, Set(siteA, siteB, siteC, siteD)) // returns Set(siteA, siteB, siteC, siteD)
val partitionSet3 = findPartitionSet(siteA, init_sitePartitionList) // returns Set(siteA, siteB, siteC, siteD)
siteA ! Merged(siteC, context.self, partitionSet3) // --> TODO: when code reaches this point, I want that all the message calls before this call are fully executed by the sites and then this message should be sent and handled by `siteA`. How to achieve that?
Behaviors.receiveMessage {
case Broadcast(msg: SiteProtocol, from: ActorRef[SiteProtocol], partitionSet: Set[ActorRef[SiteProtocol]]) =>
partitionSet.foreach { child =>
if(!child.equals(from)) {
child ! msg
}
}
Behaviors.same
}
}
}
1条答案
按热度按时间7d7tgy0s1#
一般情况下,没有现成的方法可以做到这一点。
知道消息已被处理的唯一方法是让该消息的收件人知道(或其指定人员)发送消息作为答复。这需要设计一个协议(在本例中为
SiteProtocol
),然后发送方跟踪它在等待哪些回复。(相关ID或确保您没有与同一参与者的并发交互在这里很有帮助),当收到所有回复时,它就可以继续做下一件事。类似地,请求-响应协议可以用来收集站点参与者计算的值。注意,对于参与者处理其他消息(并可能改变自己的状态)没有内在的保护,尽管限制
ActorRef
的传播范围(以及消息类型的通用性)可以限制这一点。从技术上讲,可以使用非Akka并发方法在参与者之间共享数据:注意,如果采用这种方法,就很大程度上放弃了actor模型的优点(特别是单线程错觉和位置透明性)。