我从一个未绑定的源(Kafka)和写作它的字数到其他Kafka主题。现在我想在beam管道中执行checkpoint。我已经按照apachebeam文档中的所有说明进行了操作,但即使在这之后也没有创建checkpoint目录。
下面是我用于pipeline:-
--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
有人能帮我检查一下吗?
1条答案
按热度按时间3j86kqsm1#
我已经研究过这个解决方案,所以一个是您可以在link cluster的flink-conf.yaml中更改checkpoint.state.dir路径,另一个是使用flinkpipelineoptions-
通过设置setstatebackendfactory(我已经使用自定义类)
这将创建一个checkpointdir。您还需要设置checkpointinginterval的值,以便启用检查点。