我正在使用akka/scala/play stack。
通常,我使用流来执行某些任务。例如,我有一个流,它每分钟都会醒来,从数据库中获取一些东西,并调用另一个服务来使用API丰富其数据,并将丰富保存到数据库中。
就像这样:
class FetcherAndSaveStream @Inject()(fetcherAndSaveGraph: FetcherAndSaveGraph, dbElementsSource: DbElementsSource)
(implicit val mat: Materializer,
implicit val exec: ExecutionContext) extends LazyLogging {
def graph[M1, M2](source: Source[BDElement, M1],
sink: Sink[BDElement, M2],
switch: SharedKillSwitch): RunnableGraph[(M1, M2)] = {
val fetchAndSaveDataFromExternalService: Flow[BDElement, BDElement, NotUsed] =
fetcherAndSaveGraph.fetchEndSaveEnrichment
source.viaMat(switch.flow)(Keep.left)
.via(fetchAndSaveDataFromExternalService)
.toMat(sink)(Keep.both).withAttributes(supervisionStrategy(resumingDecider))
}
def runGraph(switchSharedKill: SharedKillSwitch): (NotUsed, Future[Done]) = {
logger.info("FetcherAndSaveStream is now running")
graph(dbElementsSource.dbElements(), Sink.ignore, switchSharedKill).run()
}
}
我想知道,这是不是比仅仅使用一个每分钟都滴答作响的演员做这样的事情更好?使用演员做这个和流有什么比较?
我还在想什么时候应该选择哪种方法(流/演员)。谢谢!!
1条答案
按热度按时间osh3o9ms1#
您可以使用这两种方法,这取决于您对解决方案的要求,这些要求没有在这里列出。您需要考虑的一般问题是-参与者比流更低级,因此它们需要更多的代码和调试。
基本上,流适合于需要以低内存消耗处理相对大量数据的任务。使用流,你不需要每
n
秒开始流,你可以设置这个流与应用程序一起运行。这可以通过省略调度器逻辑使你的代码更简洁。我将省略DI和架构的东西,用伪代码编写解决方案:这个流可以完成你的任务,你甚至可以用更复杂的逻辑来增强它,使用图api中的反馈循环来根据负载调整轮询速率,同时,你可以添加错误处理策略来恢复流崩溃的地方。
此外,有DBS的alpakka连接器能够这样做,你可以看看那里的解决方案是否适合你的目的,或检查实施细节。
这样做可以得到什么-反压力、处理流的能力、干净简洁的代码,而不需要您直接管理时间自动机。https://doc.akka.io/docs/akka/current/stream/stream-rate.html
你也可以创建一个演员,但是你应该手动完成akka流为你做的所有事情,例如,如果你想与流互操作,调度程序,分块和内存管理(不要在一个批处理中将100000个左右的条目加载到内存中),等等。