object TestSource {
implicit val ec = ExecutionContext.global
def main(args: Array[String]): Unit = {
def buildSource = {
println("fresh")
Source(List(() => 1,() => 2,() => 3,() => {
println("crash")
throw new RuntimeException(":(((")
}))
}
val restarting = RestartSource.onFailuresWithBackoff(
minBackoff = Duration(1, SECONDS) ,
maxBackoff = Duration(1, SECONDS),
randomFactor = 0.0,
maxRestarts = 10
)(() => {
buildSource
})
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
restarting.runWith(Sink.foreach(e => println(e())))
}
}
上面的代码将打印:1,2,3,崩溃为什么我的源代码不重新启动?这几乎是官方文档的1:1副本。
编辑:
我也试过
val rs = RestartSink.withBackoff[() => Int](
Duration(1, SECONDS),
Duration(1, SECONDS),
0.0,
10
)(_)
val rsDone = rs(() => {
println("???")
Sink.foreach(e => println(e()))
})
restarting.runWith(rsDone)
但仍无法重新启动
3条答案
按热度按时间h6my8fg21#
这是因为当您呼叫从
Source
发出的函式时,会在Sink.foreach
中的buildSource
Source
外部触发例外状况。试试看:
这样,您的异常将在
RestartSource
Package 的内部Source
中发生,并且重新启动机制将启动。erhoui1w2#
源不会重新启动,因为您的源从不出现故障,因此永远不需要重新启动。
当
Sink.foreach
评估它所收到的函式时,会掷回例外状况。正如artur所指出的,如果可以将失败的位移到源代码中,就可以将所有内容 Package 到
RestartSource
中的接收器。虽然这对这个人为的例子没有帮助(因为重新启动一个接收器不会导致重新发送以前发送的消息),但在现实世界中可能发生这种情况的情况下,将接收器 Package 在
RestartSink
中可能是有用的(我突然想到,由于接收器中的偏移提交失败(例如,在重新平衡之后),来自Kafka的流爆炸应该是这种情况的一个例子)。另一种选择是,如果您希望在任何部分失败时重新启动整个流,并且流具体化为
Future
,则可以在失败的future上实现带回退的重试。yb3bgrhw3#
正如这里所说的,源代码永远不会崩溃。你实际上是在崩溃你的接收器,而不是一个使用
e => e()
语句的源代码当将上面的lambda应用于source的最后一个元素时会发生这种情况:
下面是接收器中没有未处理异常的同一个流:
...重新启动源。带有回退(...
效果很好。