如何避免“遇到未注册类id”异常?

kxeu7u2r  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(308)

我正在使用 SessionItem 中存储聚合状态的类 fold 函数并将其传递给 sink 功能:

.fold(SessionItem.empty(), new CalculateSessionMetrics())
.addSink(new AsyncSendSessionSink())

当我将几个字段添加到此类时:

...

private List<AdSessionItem<Long>> adPHashSessionItems = new ArrayList<>();
private List<AdSessionItem<String>> adDtmfCodeSessionItems = new ArrayList<>();

...

public List<AdSessionItem<Long>> getAdPHashSessionItems() {
    return adPHashSessionItems;
}

public List<AdSessionItem<String>> getAdDtmfCodeSessionItems() {
    return adDtmfCodeSessionItems;
}

public AdSessionItem<Long> lastAdPHashSessionItem() {
    return adPHashSessionItems.size() > 0 ? adPHashSessionItems.get(adPHashSessionItems.size() - 1) : null;
}

public AdSessionItem<String> lastAdDtmfCodeSessionItem() {
    return adDtmfCodeSessionItems.size() > 0 ? adDtmfCodeSessionItems.get(adDtmfCodeSessionItems.size() - 1) : null;
}

...

我得到以下例外:

java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:522)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to deserialize default value.
    at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:300)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at java.util.HashMap.readObject(HashMap.java:1396)
    at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
    at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:517)
    ... 1 more
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: -3
Serialization trace:
adDtmfCodeSessionItems (streamer_sessions_aggregator.elements.SessionItem)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
    at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:297)
    ... 36 more

似乎flink不能用新字段反序列化对象的状态。
问题:
如何正确添加新字段以避免此类异常?
(可选)如何编写单元测试来检测此类问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题