我的单元测试在Flink 1.11.2和parquet-avro 1.10.0下运行正常,一旦我升级到1.12.0和parquet-avro 1.12.0,我的单元测试将抛出
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
...
aused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_282]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
... 27 more
字符串
我的单元测试代码片段如下所示:
private ImmutableList<PoJo> testData = ImmutableList.of(
PoJo.build("123", "0.0.0.0", null),
PoJo.build("123", "0.0.0.1", 2L)
);
DataStream<PoJo> input = env
.addSource(new TestSource(testData), PojoTypeInfo.of(PoJo.class))
.assignTimestampsAndWatermarks(watermarkStrategy);
DataStream<GenericRecord> output = input
.map(TestClass::convertPoJoToGenericRecord)
.returns(new GenericRecordAvroTypeInfo(PoJo.getAvroSchema()));
output.addSink();
型
函数类似于
GenericRecord convertPoJoToGenericRecord(PoJo pojo) throws Exception {
Schema schema = PoJo.getAvroSchema();
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
for (Schema.Field field : schema.getFields()) {
builder.set(field.name(), TestClass.getObjectField(field, pojo));
}
GenericRecord record = builder.build();
return record;
}
型
有人能帮我吗?
谢谢
1条答案
按热度按时间yquaqz181#
这听起来像一个有效的答案,可能会有所帮助:
https://lists.apache.org/thread/8f21loz5915dzw8cy2q8c08kxypvj1sq
我建议使用AvroSerializer序列化GenericRecords。您必须将org.apache.flink:flink-avro作为依赖项添加到您的作业中,然后告诉系统您希望通过以下方式使用GenericRecordAvroTypeInfo
DataStream sourceStream = env.addSource(new AvroGenericSource()). return(new GenericRecordAvroTypeInfo(schema));
你可以在这里找到更多的信息[1]。
[1]https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro