我的flink管道目前使用一个pojo,它包含一些列表和Map(字符串),沿着
public class MyPojo {
private List<String> myList = new ArrayList<>();
private OtherPojo otherPojo = new OtherPojo();
// getters + setters...
}
public class OtherPojo {
private Map<String, String> myMap = new HashMap<>();
// getters + setters...
}
出于性能方面的原因,我想绕过kryo序列化,所以我禁用了通用回退 env.getConfig().disableGenericTypes();
如flink文档所述。
现在,Flink抱怨名单:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
...
在flink中序列化这些简单列表和Map的首选方法是什么?。在内部,这些都是 ArrayList
以及 HashMap
,但其他实现也可以。好像有课 org.apache.flink.api.common.typeutils.base.ListSerializer
在Flink,但我不知道如何使用它。
2条答案
按热度按时间wecizke31#
marius已经很好地解释了原因,尽管我不明白为什么flink不支持您的开箱即用的用例。尽管如此,我还是要添加现在有效的解决方案。
注意,测试代码中糟糕的访问模式只是为了快速而肮脏的演示。
agxfikkp2#
如果您这样做:
每当遇到要通过kryo的数据类型时,它就会引发异常。
因此,在这种情况下,您必须编写自己的序列化程序。可以使用
TypeSerializer
,只需呼叫typeInfo.createSerializer(config)
在typeinformation对象上。对于泛型类型,您需要通过typehint“捕获”泛型类型信息,对于列表:
listtypeinfo类
更多细节在这里。