GraphStage akka的重试机制

rseugnpd  于 2023-03-18  发布在  其他
关注(0)|答案(1)|浏览(147)

如果postStop()失败,我们如何重试/重启graphStage?
我是相对较新的 akka ,真的很感谢您的帮助。
以下是我正在努力做的事情:

source
    .via(Auditing)
    .via(someTransformations)
    .async.to(sink)
**Auditing** - a class extending GraphStageWithMaterializedValue[FlowShape[T,T], Future[Unit]]
    contains:
      - inport, outport, flowShape
      - overridden method createLogicAndMaterializedValue() -> returns (GraphStageLogic, Future[Unit])
          - overriden method postStop(){ 
                //contains audit logic
                // handles failures and throws exception
            }
          - setHandler for input and output.

是否可以为类Auditing添加一些重试机制,以便它可以重新处理相同的数据并最终到达接收器,而不必再次从源读取相同的数据。重试必须在m分钟内尝试n次。
我想使用RetryFlow.withBackOff()之类的东西,但似乎这是为Flow设计的,而不是GraphStageLogic。此外,如果我可以在失败场景中返回相同的数据,这将工作,但从失败阶段再次调用sethandler(outport)实际上不会再次进入sink。
注:我确实有监督决策器来处理整体故障,但我需要重试此GraphStage,而不是重新启动/恢复流。

3gtaxfhh

3gtaxfhh1#

我不知道RetryFlow.withBackOff()是否按照您的意愿工作,但是:如果您问题是AuditingGraphStage,而RetryFlow.withBackOff()只接受Flow,那么您可以直接使用Flow.fromGraph
下面的代码可以很好地编译:

class Auditing[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], Future[Unit]] {
  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Unit]) = ???
  override def shape: FlowShape[T, T] = ???
}

def retryingAuditingFlow[T]: Flow[T, T, Future[Unit]] = RetryFlow.withBackoff(
  minBackoff = ???,
  maxBackoff = ???,
  randomFactor = ???,
  maxRetries = ???,
  Flow.fromGraph(new Auditing[T]),
)(decideRetry = ???)

def source[T]: Source[T, _] = ???
def sink[T]: Sink[T, _] = ???

def theStream[T]: RunnableGraph[_] =
  source[T]
    .via(retryingAuditingFlow[T])
    .async.to(sink)

相关问题