在apache flink中定义和执行流处理器的作业图之前,我想运行一些初始化代码,例如,用于创建kafka主题,我将其用作作业图中的接收器。但是,当流处理器从保存点还原时(例如,在流处理器更新期间),不应运行此初始化代码。有没有办法通过编程检查作业是否从保存点启动?
flmtquvp1#
可以实现知道快照和恢复的flink函数。通过实现 CheckpointedFunction 接口。那什么时候 initializeState(FunctionInitializationContext context) 叫做,你可以检查一下 context.isRestored() 确定是否从快照(即从检查点或保存点)重新启动作业。你可以采取的另一种方法是检查主题是否已经存在,如果不存在,继续创建它们而不考虑作业是如何开始的。
CheckpointedFunction
initializeState(FunctionInitializationContext context)
context.isRestored()
wlzqhblo2#
从保存点重新启动作业时,必须指定保存点目录的路径。我们采用以下方法:
$ bin/flink run -s :savepointPath [:runArgs]
如果我正确理解你的问题,你所要做的就是核实一下 --fromSavepoint 或者 -s (别名)已指定。https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#resuming-从保存点
--fromSavepoint
-s
2条答案
按热度按时间flmtquvp1#
可以实现知道快照和恢复的flink函数。通过实现
CheckpointedFunction
接口。那什么时候initializeState(FunctionInitializationContext context)
叫做,你可以检查一下context.isRestored()
确定是否从快照(即从检查点或保存点)重新启动作业。你可以采取的另一种方法是检查主题是否已经存在,如果不存在,继续创建它们而不考虑作业是如何开始的。
wlzqhblo2#
从保存点重新启动作业时,必须指定保存点目录的路径。
我们采用以下方法:
如果我正确理解你的问题,你所要做的就是核实一下
--fromSavepoint
或者-s
(别名)已指定。https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#resuming-从保存点