我目前正在将一个Akka Classic应用程序移植到Akka Typed。我有以下组件:
- HttpService -不是执行元
- JobDispatcher -执行者
- JobWorker -JobDispatcher的子执行元
JobDispatcher是编排作业的单例执行元。每个JobWorker负责一个“作业”,并知道该作业的状态。
HTTP服务将向JobDispatcher发出一个名为GetJobStatuses的询问。然后,JobDispatcher将询问每个JobWorkers的状态,将结果聚合到一个列表中,并回复HttpService。
我在Akka Classic中的做法是让JobDispatcher执行所有的请求,将Futures放入Futures列表中,然后将其转换为Future of Lists,当聚合Future完成后,我将结果发送到HttpService。
val statusFutures: Seq[Future[JobStatus]] = jobWorkers map (jobWorker => (jobWorker ? GetJobStatus).mapTo[JobStatus])
val aggregateFuture: Future[Seq[SearchStatus]] = Future.sequence(statusFutures)
val theSender = context.sender()
aggregateFuture onComplete {
case Success(jobStatuses: Seq[JobStatus]) => {
theSender ! jobStatuses
}
case Failure(exception) => {
theSender ! exception
}
}
所以,现在我们要转到Akka Typed,我们不应该使用Futures / onComplete,而是将Ask响应转换为返回给我们自己的消息(在本例中为JobDispatcher)。这对于我向另一个参与者请求一个响应的简单情况是相当简单的。但在本例中,我有一个完整的子参与者列表,我需要从中编译他们的响应。
我能想到的唯一方法是让JobDispatcher保存我正在等待的JobWorker响应列表的“状态,”跟踪哪些响应已经收到,当我收到所有响应时,将响应消息发送回HTTP服务。并且以某种方式标识每个状态对应的HTTP请求。
这比上面的聚合未来解决方案要复杂得多。
在Akka Typed中,处理这种情况的简单/正确方法是什么?
1条答案
按热度按时间jckbn6z71#
文档建议在这种情况下使用每个会话的子执行元。子执行元只与一个HTTP请求相关联,它隐式地跟踪该状态的一个副本,并且还能够管理分散/收集作业的进程状态(例如,在超时和重试附近)。
同样值得注意的是,示例经典代码有一个巨大的bug:不要在涉及future的代码中调用
sender
。混合future和actors表面上看起来很容易,但也很容易变成只有在巧合下才能工作的东西(测试经常表现出这种巧合的行为)。