Flink中使用Map值

e5nqia27  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(168)

I have a question about using MapValue in Flink because I need to save a map as part of the state, as you know the state needs to be deserializable/serializable, so i extend my class from MapValue because MapValue is an abstract class.

public class HashMapValue<K extends Value, V extends Value> extends MapValue<K, V> {
    public HashMapValue() {
        super();
    }
}

When i try to use this class, i can’t initiate the class

HashMapValue<IntValue, BooleanValue> hashMapValue = new HashMapValue<IntValue, BooleanValue>();

the error message is as follows:

java.lang.AssertionError
    at org.apache.flink.util.ReflectionUtil.getTemplateTypes(ReflectionUtil.java:141)
    at org.apache.flink.util.ReflectionUtil.getSuperTemplateTypes(ReflectionUtil.java:98)
    at org.apache.flink.util.ReflectionUtil.getTemplateType(ReflectionUtil.java:44)
    at org.apache.flink.util.ReflectionUtil.getTemplateType1(ReflectionUtil.java:54)
    at org.apache.flink.types.MapValue.<init>(MapValue.java:55)

I step into the code and apparently in getTemplateTypes function, it gets K as the parameter instead of IntValue, which I clearly passed in as generic type.
I can’t find online or in the flink source code anything related to MapValue as example. Can anyone help me by pointing out how to use MapValue in Flink?

6yt4nkrj

6yt4nkrj1#

Flink的MapState就是为了这个目的而设计的,参见文档中教程中的https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/learn-flink/event_driven/示例。

jtjikinw

jtjikinw2#

  • MapValue* 不用于此目的,Flink状态本身支持Java Map,所以我们可以直接使用类似 HashMap 的值作为状态值。

相关问题