flink序列化java.util.list和java.util.map

nimxete2  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(806)

我的flink管道目前使用一个pojo,它包含一些列表和Map(字符串),沿着

public class MyPojo {
    private List<String> myList = new ArrayList<>();
    private OtherPojo otherPojo = new OtherPojo();

    // getters + setters...
}

public class OtherPojo {
    private Map<String, String> myMap = new HashMap<>();

    // getters + setters...
}

出于性能方面的原因,我想绕过kryo序列化,所以我禁用了通用回退 env.getConfig().disableGenericTypes(); 如flink文档所述。
现在,Flink抱怨名单:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
    at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    ...

在flink中序列化这些简单列表和Map的首选方法是什么?。在内部,这些都是 ArrayList 以及 HashMap ,但其他实现也可以。好像有课 org.apache.flink.api.common.typeutils.base.ListSerializer 在Flink,但我不知道如何使用它。

wecizke3

wecizke31#

marius已经很好地解释了原因,尽管我不明白为什么flink不支持您的开箱即用的用例。尽管如此,我还是要添加现在有效的解决方案。

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
    ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
    ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));

// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");

final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);

DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));

注意,测试代码中糟糕的访问模式只是为了快速而肮脏的演示。

agxfikkp

agxfikkp2#

如果您这样做:

env.getConfig().disableGenericTypes();

每当遇到要通过kryo的数据类型时,它就会引发异常。
因此,在这种情况下,您必须编写自己的序列化程序。可以使用 TypeSerializer ,只需呼叫 typeInfo.createSerializer(config) 在typeinformation对象上。
对于泛型类型,您需要通过typehint“捕获”泛型类型信息,对于列表:

TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});

listtypeinfo类
更多细节在这里。

相关问题