我正在尝试使用flink的SQLAPI从Map访问密钥。它失败了,在线程“main”org.apache.flink.table.api中出现错误异常。tableexception:类型不受支持:任何问题,请告诉我如何修复它。这是我的活动课
public class EventHolder {
private Map<String,String> event;
public Map<String, String> getEvent() {
return event;
}
public void setEvent(Map<String, String> event) {
this.event = event;
}
}
下面是提交flink作业的主类
public class MapTableSource {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());
// register a table and use SQL
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("mapEvent", mapEventStream);
//tableEnv.registerFunction("orderSizeType", new OrderSizeType());
Table alerts = tableEnv.sql(
"select event['key'] from mapEvent ");
DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);
alertStream.filter(new FilterFunction<String>() {
private static final long serialVersionUID = -2438621539037257735L;
@Override
public boolean filter(String value) throws Exception {
System.out.println("Key value is:"+value);
return value!=null;
}
});
env.execute("map-tablsource-job");
}
private static List<EventHolder> getMaps(){
List<EventHolder> list = new ArrayList<>();
for(int i=0;i<5;i++){
EventHolder holder = new EventHolder();
Map<String,String> map = new HashMap<>();
map.put("key", "value");
holder.setEvent(map);
list.add(holder);
}
return list;
}
}
当我运行它的时候,我得到了一个异常
Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)
我用的是flink 1.3.1
2条答案
按热度按时间bf1o4zei1#
我认为问题在于
fromCollection
. 由于java的限制(即类型擦除),flink无法提取所需的类型信息。因此,Map被视为具有任何类型sql的黑盒。您可以使用tableEnv.scan("mapEvent").printSchema()
. 可以在中指定类型信息fromCollection
与Types.MAP(Types.STRING, Types.STRING)
.ffvjumwh2#
我用以下方法解决了一个类似的问题: