flinksql支持javaMap类型吗?

nlejzf6q  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(406)

我正在尝试使用flink的SQLAPI从Map访问密钥。它失败了,在线程“main”org.apache.flink.table.api中出现错误异常。tableexception:类型不受支持:任何问题,请告诉我如何修复它。这是我的活动课

public class EventHolder {

        private Map<String,String> event;

        public Map<String, String> getEvent() {
            return event;
        }

        public void setEvent(Map<String, String> event) {
            this.event = event;
        }
    }

下面是提交flink作业的主类

public class MapTableSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

        // register a table and use SQL
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("mapEvent", mapEventStream); 
        //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

        Table alerts = tableEnv.sql(
                "select event['key'] from mapEvent ");

        DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

        alertStream.filter(new FilterFunction<String>() {
            private static final long serialVersionUID = -2438621539037257735L;

            @Override
            public boolean filter(String value) throws Exception {
                System.out.println("Key value is:"+value);
                return value!=null;
            }
        });

        env.execute("map-tablsource-job");
    }

    private static List<EventHolder> getMaps(){
        List<EventHolder> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            EventHolder holder = new EventHolder();
            Map<String,String> map = new HashMap<>();
            map.put("key", "value");
            holder.setEvent(map);
            list.add(holder);
        }
        return list;
    }
}

当我运行它的时候,我得到了一个异常

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

我用的是flink 1.3.1

bf1o4zei

bf1o4zei1#

我认为问题在于 fromCollection . 由于java的限制(即类型擦除),flink无法提取所需的类型信息。因此,Map被视为具有任何类型sql的黑盒。您可以使用 tableEnv.scan("mapEvent").printSchema() . 可以在中指定类型信息 fromCollectionTypes.MAP(Types.STRING, Types.STRING) .

ffvjumwh

ffvjumwh2#

我用以下方法解决了一个类似的问题:

//Should probably make MapVal more generic, but works for this example
public class MapVal extends ScalarFunction {
    public String eval(Map<String, String> obj, String key) {
        return obj.get(key);
    }
}

public class Car {
    private String make;
    private String model;
    private int year;
    private Map<String, String> attributes;
    //getters/setters...
}

//After registering Stream and TableEnv etc

tableEnv.registerFunction("mapval", new MapVal());

Table cars = tableEnv
                .scan("Cars")
                .select("make, model, year, attributes.mapval('name')");

相关问题