我有一个简单的管道
env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
.keyBy(_.id)
.process(new Processor).uid("processor")
.addSink(kafkaProducer).name(producerName)
现在我试着简单地添加 uid
像这样沉下去
env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
.keyBy(_.id)
.process(new Processor).uid("processor")
.addSink(kafkaProducer).name(producerName).uid("kafka-sink")
但我得到了一个很长的例外,似乎这是信息的一部分:
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/tmp/rocksdb/savepoint-445173-011657873d74. Cannot map checkpoint/savepoint state for operator 3cfeb06db0484d5556a7de8db2025f09 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1175)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:299)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
这有道理吗?有没有办法在不失去保存点的情况下解决这个问题?
1条答案
按热度按时间plupiseo1#
这个问题很有意义,因为如果不手动指定ID,它们将自动生成。可能生成的id是
3cfeb06db0484d5556a7de8db2025f09
.您有三个选择:
在没有保存点的情况下启动作业:这将导致数据丢失;
运行允许非还原状态的作业;
使用
3cfeb06db0484d5556a7de8db2025f09
作为uid
你的接线员。以下是一些可以帮助您的链接:
分配操作员ID
允许非还原状态
如果我从我的作业中删除一个具有状态的操作符,会发生什么?