如果postStop()失败,我们如何重试/重启graphStage?
我是相对较新的 akka ,真的很感谢您的帮助。
以下是我正在努力做的事情:
source
.via(Auditing)
.via(someTransformations)
.async.to(sink)
**Auditing** - a class extending GraphStageWithMaterializedValue[FlowShape[T,T], Future[Unit]]
contains:
- inport, outport, flowShape
- overridden method createLogicAndMaterializedValue() -> returns (GraphStageLogic, Future[Unit])
- overriden method postStop(){
//contains audit logic
// handles failures and throws exception
}
- setHandler for input and output.
是否可以为类Auditing添加一些重试机制,以便它可以重新处理相同的数据并最终到达接收器,而不必再次从源读取相同的数据。重试必须在m分钟内尝试n次。
我想使用RetryFlow.withBackOff()之类的东西,但似乎这是为Flow设计的,而不是GraphStageLogic。此外,如果我可以在失败场景中返回相同的数据,这将工作,但从失败阶段再次调用sethandler(outport)实际上不会再次进入sink。
注:我确实有监督决策器来处理整体故障,但我需要重试此GraphStage,而不是重新启动/恢复流。
1条答案
按热度按时间3gtaxfhh1#
我不知道
RetryFlow.withBackOff()
是否按照您的意愿工作,但是:如果您问题是Auditing
是GraphStage
,而RetryFlow.withBackOff()
只接受Flow
,那么您可以直接使用Flow.fromGraph
。下面的代码可以很好地编译: