我正在尝试进行Flink State Schema Evolution的POC。我使用的是Flink 1.15.0和Java 11。我尝试创建3个数据类-每个数据类对应一个序列化类型:
io.peleg.kryo.User
-使用java.time.Instant
类,我知道Flink中的POJO序列化不支持该类。io.peleg.pojo.User
-只使用经典的 Package 原语-Integer
,Long
,String
。getter,setter和constructor使用Lombok生成。io.peleg.avro.User
-使用Avro Maven插件从Avro架构生成。
对于每个类,我都编写了一个流作业,它使用时间窗口来缓冲元素并将它们转换为列表。
对于每一个类,我尝试做以下事情:
1.运行作业
1.停止保存点
1.向数据类添加字段
1.使用保存点提交
对于所有数据类,使用保存点提交失败,但出现以下异常:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
Serialization trace:
favoriteColor (io.peleg.avro.User)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
... 20 more
Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
at java.base/java.util.Objects.checkIndex(Unknown Source)
at java.base/java.util.ArrayList.get(Unknown Source)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 31 more
由于Flink不支持Kryo序列化类的状态模式演化,我预计io.peleg.kryo.User
会抛出此异常。
但在我看来,对于所有类,它最终都使用了Kryo序列化器,而不是POJO或Avro序列化器。
我在使用docker compose创建的Flink集群上运行了这个任务:
version: "2.2"
services:
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
我的全部代码在GitHub here上是公开的。
我希望实现的是成功地从具有更少/更多字段的旧版本POJO的保存点运行作业。
1条答案
按热度按时间6rqinv9w1#
我用这个POJO类做了一个实验:
下面的图片来自IntelliJ调试器,正如你所看到的,Flink提供了一个
InstantSerializer
,并且正在使用它的POJOSerializer
作为这个Event
类。我不知道你哪里做错了,但你可以用
关闭Kyro后备系统这对实验会有帮助。