Akka RestartSource无法重新启动

ulydmbyx  于 2022-11-06  发布在  其他
关注(0)|答案(3)|浏览(189)
object TestSource {
  implicit val ec = ExecutionContext.global

  def main(args: Array[String]): Unit = {
    def buildSource = {
      println("fresh")
      Source(List(() => 1,() => 2,() => 3,() => {
        println("crash")
      throw new RuntimeException(":(((")
      }))
    }
    val restarting = RestartSource.onFailuresWithBackoff(
      minBackoff = Duration(1, SECONDS) ,
      maxBackoff = Duration(1, SECONDS),
      randomFactor = 0.0,
      maxRestarts = 10
    )(() => {
      buildSource
    })

     implicit val actorSystem: ActorSystem           = ActorSystem()
     implicit val executionContext: ExecutionContext = actorSystem.dispatcher

    restarting.runWith(Sink.foreach(e => println(e())))

  }
}

上面的代码将打印:1,2,3,崩溃为什么我的源代码不重新启动?这几乎是官方文档的1:1副本。
编辑:
我也试过

val rs = RestartSink.withBackoff[() => Int](
      Duration(1, SECONDS),
      Duration(1, SECONDS),
      0.0,
      10
    )(_)
    val rsDone = rs(() => {
      println("???")
      Sink.foreach(e => println(e()))
    })
    restarting.runWith(rsDone)

但仍无法重新启动

h6my8fg2

h6my8fg21#

这是因为当您呼叫从Source发出的函式时,会在Sink.foreach中的buildSourceSource外部触发例外状况。
试试看:

val restarting = RestartSource.onFailuresWithBackoff(
      minBackoff = Duration(1, SECONDS) ,
      maxBackoff = Duration(1, SECONDS),
      randomFactor = 0.0,
      maxRestarts = 10
      )(() => {
        buildSource
         .map(e => e()) //call the functions inside the RestartSource
      })

这样,您的异常将在RestartSource Package 的内部Source中发生,并且重新启动机制将启动。

erhoui1w

erhoui1w2#

源不会重新启动,因为您的源从不出现故障,因此永远不需要重新启动。
Sink.foreach评估它所收到的函式时,会掷回例外状况。
正如artur所指出的,如果可以将失败的位移到源代码中,就可以将所有内容 Package 到RestartSource中的接收器。
虽然这对这个人为的例子没有帮助(因为重新启动一个接收器不会导致重新发送以前发送的消息),但在现实世界中可能发生这种情况的情况下,将接收器 Package 在RestartSink中可能是有用的(我突然想到,由于接收器中的偏移提交失败(例如,在重新平衡之后),来自Kafka的流爆炸应该是这种情况的一个例子)。
另一种选择是,如果您希望在任何部分失败时重新启动整个流,并且流具体化为Future,则可以在失败的future上实现带回退的重试。

yb3bgrhw

yb3bgrhw3#

正如这里所说的,源代码永远不会崩溃。你实际上是在崩溃你的接收器,而不是一个使用e => e()语句的源代码
当将上面的lambda应用于source的最后一个元素时会发生这种情况:

java.lang.RuntimeException: :(((

下面是接收器中没有未处理异常的同一个流:
...重新启动源。带有回退(...

restarting.runWith(
  Sink.foreach(e => {
    def i: Int = try{ e() } catch {
      case t: Throwable =>
        println(t)
        -1
    }
    println(i)
  })
)

效果很好。

相关问题