你怎么能得到流开始的指示呢?(AKKA)

goucqfw6  于 2023-06-05  发布在  其他
关注(0)|答案(1)|浏览(158)

我有一个函数负责运行我的流,它看起来像这样:

def runGraph(sharedKillSwitch: SharedKillSwitch = sharedKillSwitch): NotUsed = {
    logger.info(s"Starting Stream")
    graph(
      source = RestartSource.withBackoff(RestartSettings(3.second, 5.minutes, 0.2))(() => SqsSource(inboundQueueURL, SqsSourceSettings().withMessageAttribute(MessageAttributeName("All")))(sqsConfig.client)),
      sink = SqsAckSink(inboundQueueURL)(sqsConfig.client),
      switch = sharedKillSwitch
    ).run()
  }

graph是另一个函数,它构造图并返回RunnableGraph[NotUsed]
现在我想调用runGraph,然后做其他事情,但它似乎不是异步调用,所以我不能Map它,等待流开始...
你怎么能得到流开始的指示呢?

cuxqih21

cuxqih211#

一个选项是利用Source.lazySource,它在下游请求元素时运行回调。这个源可以是prependSqsSource,并充当流中的第一个元素。
https://doc.akka.io/docs/akka/current/stream/operators/Source/lazySource.html
延迟Source的创建和具体化,直到有需求,然后从下游的源发出元素,就像它已经在前面创建一样。

def runGraph(sharedKillSwitch: SharedKillSwitch = sharedKillSwitch): Promise[Done] = {
    logger.info(s"Starting Stream")
    
    val streamStarted = Promise[Done]
    
    graph(
        source = RestartSource.withBackoff(RestartSettings(3.second, 5.minutes, 0.2))({ () => 
            SqsSource(inboundQueueURL, SqsSourceSettings()
                .withMessageAttribute(MessageAttributeName("All")))(sqsConfig.client)
                .prepend(Source.lazySource({ () =>
                    streamStarted.trySuccess(Done)
                    Source.empty
                }))
        }),
        sink = SqsAckSink(inboundQueueURL)(sqsConfig.client),
        switch = sharedKillSwitch
    )
    .run()

    streamStarted
}

注意:这个解决方案只告诉你流已经启动了至少一次(它可能会因为RestartSource而重新启动)。您还可以通过添加一个接收器来跟踪流关闭,该接收器跟踪流当前是否已完成,并更新一些共享状态以反映这一点。

相关问题