从1.10升级到1.11时,无法从保存点还原flink键控状态后端

wz3gfoph  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(688)

我们尝试迁移到Flink1.11,从1.10中的保存点恢复作业。作业代码没有更改,只是将flink版本的依赖项更新为1.11(在sbt中,我们使用scala)并重新构建jar。所有操作符都有UID,并且作业正确地从该保存点恢复如果在1.10群集上运行,则会出现以下异常,并且没有任何线索:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for CoStreamFlatMap_8a6da66867c6cf8469bae55e9f834297_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:540)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.runtime.checkpoint.savepoint.Savepoint'
    at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136)
    at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:54)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:36)
    at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable.java:98)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>(CopyOnWriteStateTable.java:49)
    at org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior.newStateTable(AsyncSnapshotStrategySynchronicityBehavior.java:41)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.newStateTable(HeapSnapshotStrategy.java:243)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:152)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)

有人能帮忙吗?
谢谢
更新
保存点来自使用stateprocessorapi处理的保存点和 KeyedStateBootstrapFunction 由以下材料制成:

var mapToDetector: MapState[String, Map[String, Detector]] = null
  var detectorsConfigs: MapState[String,AnomalyStepConfiguration] = null
  var outputTopic : ValueState[String]= null
  var pipeStatus: MapState[String, String] = null
  var debounceMap: MapState[String, Map[String, DebounceStats]] = null
csbfibhn

csbfibhn1#

org.apache.flink.runtime.checkpoint.savepoint.Savepoint 在flink-16247中改名。但是,此类用于保存点元数据,不应存在于任务端的键控状态序列化程序中。换句话说,您是否在状态访问中使用了与任务端的检查点或保存点相关的内容?
我还尝试使用statemachineexample在flink-1.10.2中创建保存点,它在flink-1.11.1集群中成功地恢复。程序还使用了 CopyOnWriteStateTable 默认情况下,这是您在异常堆栈跟踪中看到的。

相关问题