我用Yarn来做Flink的工作。对于每个flink作业,我都创建一个检查点。
我提交了一个在我的Yarn集群中运行的flink作业。我有一个轮询作业,它检查一个作业是否在yarn上失败并重新启动它。再次提交作业时,yarn将为此flink作业创建一个新的应用程序id。如何配置重新提交的flink作业以使用重新启动的flink作业的检查点。
我已经设定了形态 state.savepoints.dir = hdfs://localhost:9000/checkpoint/
在flink-conf.yaml中创建flink作业时,
streamExecutionEnvironment.setStateBackend(new FsStateBackend("hdfs://localhost:9000/checkpoint/uuid-job-1"));当我做这个设置时,检查点保存在conf文件中指定的路径中(
hdfs://localhost:9000/checkpoint/` )而不是我在创建flink作业时设定的路径。
任何帮助都将不胜感激。谢谢!
1条答案
按热度按时间sxpgvts31#
不幸的是,不能从旧的检查点开始新的工作。您可以使用外部化的检查点。flink<=1.5的一个缺点是,外部化检查点的元数据存储在由config参数设置的所有作业的单个目录中:
state.checkpoints.dir
. 但是你可以在每次提交之前修改它。邮件列表线程的另一个注解:
好消息是,Flink1.5将稍微修改外部化检查点的工作方式:基本上,现在可以将所有检查点视为外部化,元数据将存储在检查点的根目录中,而不是存储在所有作业的一个全局目录中。通过这种方式,外部化检查点的元数据驻留在每个作业的检查点目录中,从该目录进行恢复应该相当简单。