我有一个函数负责运行我的流,它看起来像这样:
def runGraph(sharedKillSwitch: SharedKillSwitch = sharedKillSwitch): NotUsed = {
logger.info(s"Starting Stream")
graph(
source = RestartSource.withBackoff(RestartSettings(3.second, 5.minutes, 0.2))(() => SqsSource(inboundQueueURL, SqsSourceSettings().withMessageAttribute(MessageAttributeName("All")))(sqsConfig.client)),
sink = SqsAckSink(inboundQueueURL)(sqsConfig.client),
switch = sharedKillSwitch
).run()
}
graph
是另一个函数,它构造图并返回RunnableGraph[NotUsed]
。
现在我想调用runGraph,然后做其他事情,但它似乎不是异步调用,所以我不能Map它,等待流开始...
你怎么能得到流开始的指示呢?
1条答案
按热度按时间cuxqih211#
一个选项是利用
Source.lazySource
,它在下游请求元素时运行回调。这个源可以是prepend
到SqsSource
,并充当流中的第一个元素。https://doc.akka.io/docs/akka/current/stream/operators/Source/lazySource.html
延迟Source的创建和具体化,直到有需求,然后从下游的源发出元素,就像它已经在前面创建一样。
注意:这个解决方案只告诉你流已经启动了至少一次(它可能会因为
RestartSource
而重新启动)。您还可以通过添加一个接收器来跟踪流关闭,该接收器跟踪流当前是否已完成,并更新一些共享状态以反映这一点。