通过两列连接单表flink tableapi

gopyfrb3  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(561)

我有一个包含数据的表,需要通过两个字段进行连接。
我写了一个请求,但不起作用

SELECT * 
FROM Data t1 
JOIN Data t2 ON t1.s = t2.o

代码是

val csvTableSource = CsvTableSource
  .builder
  .path("src/main/resources/data.dat")
  .field("s", Types.STRING)
  .field("p", Types.STRING)
  .field("o", Types.STRING)
  .field("TIMESTAMP", Types.STRING)
  .fieldDelimiter(",")
  .ignoreFirstLine
  .ignoreParseErrors
  .commentPrefix("%")
  .build()
tableEnv.registerTableSource("Data", csvTableSource)

val query = "SELECT * FROM Data t1 JOIN Data t2 ON t1.s = t2.o"
val table = tableEnv.sqlQuery(query)

我得到以下例外

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalJoin(condition=[=($0, $6)], joinType=[inner])
  FlinkLogicalTableSourceScan(table=[[Data]], fields=[s, p, o, TIMESTAMP], source=[CsvTableSource(read fields: s, p, o, TIMESTAMP)])
  FlinkLogicalTableSourceScan(table=[[Data]], fields=[s, p, o, TIMESTAMP], source=[CsvTableSource(read fields: s, p, o, TIMESTAMP)])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
pu82cl6c

pu82cl6c1#

我猜,您正在尝试在流媒体环境中运行此查询。流表上的非窗口联接是用flink1.5.0添加的。
因此,您正在尝试使用flink 1.4.2中尚不支持的功能。
您可以切换到批处理环境,这应该是可能的,因为您正在读取csv文件或升级到flink 1.5.0。

相关问题