我使用的是flink1.9和restapi /jobs/:jobid/savepoints
触发保存点并取消作业(优雅地停止作业以便稍后从保存点运行)。
我在源代码中使用了一个两阶段提交函数,所以我的源代码实现了这两个功能 CheckpointedFunction
以及 CheckpointListener
接口。在 snapshotState()
方法调用我快照内部状态并打开 notifyCheckpointComplete()
我检查点状态到第三方系统。
从源代码中可以看到,只有 snapshotState()
部件同步于 CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
检查点确认和完成通知在中是异步的 AsyncCheckpointRunnable
.
也就是说,当 savepoint
与 cancel-job
设置为 true
被触发时,在拍摄快照之后,某些任务管理器会在取消和执行作业之前保持接收完成通知 notifyCheckpointComplete()
,有些则不然。
问题是是否有一种方法可以用savepoint取消作业,这样 notifyCheckpointComplete()
是否保证在取消作业之前由所有任务管理器调用,或者目前无法实现这一点?
2条答案
按热度按时间n3ipq98p1#
我已经有一段时间没有看Flink1.9了,所以请谨慎地接受我的回答。
我猜你的消息来源取消得太早了。所以呢
notifyCheckpointComplete
实际上发送到所有任务,但是SourceFunction
他已经辞职了run
并对各自的任务进行了清理。好吧,如果你在收到最后一封信之前忽略取消和中断,你所描述的应该是可能的
notifyCheckpointComplete
.oknrviil2#
使用stop with savepoint[1][2]难道不能解决问题吗?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-作业ID停止[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html