flink表api&sql和Map类型(scala)

nbysray5  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(419)

我在流媒体环境中使用flink的表api和/或flink的sql支持(flink1.3.1、scala 2.11)。我从一个 DataStream[Person] ,和 Person 是一个case类,如下所示:

Person(name: String, age: Int, attributes: Map[String, String])

一切正常,直到我开始带来 attributes 进入画面。
例如:

val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

... 导致:
线程“main”org.apache.flink.table.api.tableexception中的异常:不支持类型:org.apache.flink.table.api.tableexception$.apply中的任何异常。scala:53)在org.apache.flink.table.calcite.flinktypefactory$.totypeinfo(flinktypefactory)。scala:341)在org.apache.flink.table.plan.logical.logicalrelnode$$anonfun$12.apply(运算符。scala:531)在org.apache.flink.table.plan.logical.logicalrelnode$$anonfun$12.apply(operators。scala:530)在scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike。scala:234)在scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike。scala:234)在scala.collection.iterator$class.foreach(iterator。scala:893)在scala.collection.abstractiterator.foreach(迭代器。scala:1336)在scala.collection.iterablelike$class.foreach(iterablelike。scala:72)在scala.collection.abstractiterable.foreach(iterable。scala:54)在scala.collection.traversablelike$class.map(traversablelike。scala:234)在scala.collection.abstracttraversable.map(可遍历。scala:104)在org.apache.flink.table.plan.logical.logicalrelnode.(operators。scala:530)在org.apache.flink.table.api.tableenvironment.sql(tableenvironment。scala:503)在com.nordstrom.mdt.job$.main(job。scala:112)在com.nordstrom.mdt.job.main(job.scala)
注意:无论是否存在特定的Map键,都会发生此错误。还要注意的是,如果我没有指定一个Map键,我会得到一个不同的错误,这是有意义的;这种情况在这里不起作用。
这份公关似乎在说有一条前进的道路:https://github.com/apache/flink/pull/3767. 具体地看一下测试用例,它表明类型信息对于数据集是可能的。没有相关的方法 fromDataStream 以及 registerDataStream 提供提供类型信息的方法。
这可能吗?换句话说,流上的flinksql是否支持Map?
正在澄清编辑。。。省略Map键时( GROUP BY ... attributes 而不是 attributes['foo'] ),我得到下面的错误。这表示运行时确实知道这些是字符串。
此类型(接口scala.collection.immutable.map[scala.tuple2(\u 1:string,\u 2:string)])不能用作键。

w41d8nur

w41d8nur1#

目前,flinksql只支持java java.util.Map . scalaMap被视为带有flink的黑盒 GenericTypeInfo /sql语句 ANY 数据类型。因此,您可以转发这些黑匣子,并在标量函数中使用它们,但使用 ['key'] 不支持运算符。
因此,要么使用javaMap,要么自己在udf中实现访问操作。
我为您的问题创建了一个问题:https://issues.apache.org/jira/browse/flink-7360

相关问题