使用TUMBLE的Flink窗口聚合在TIMESTAMP时失败

vfh0ocws  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(821)

我们在数据库中有一个表A,我们正在使用Flink SQL JdbcCatalog将该表加载到flink中。
下面是我们如何加载数据
val catalog = new JdbcCatalog("my_catalog", "database_name", username, password, url)
streamTableEnvironment.registerCatalog("my_catalog", catalog)streamTableEnvironment.useCatalog("my_catalog")
val query = "select timestamp, count from A"
val sourceTable = streamTableEnvironment.sqlQuery(query) streamTableEnvironment.createTemporaryView("innerTable", sourceTable)
val aggregationQuery = select window_end, sum(count) from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10' minutes)) group by window_end
它抛出以下错误Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the timecol is a time attribute type, but is TIMESTAMP(6).
简而言之,我们希望对已经存在的列应用窗口聚合。

h7appiyu

h7appiyu1#

在Flink SQL中用作时间属性的时间戳列必须是TIMESTAMP(3)或TIMESTAMP_LTZ(3)。

o2gm4chl

o2gm4chl2#

列应为TIMESTAMP(3)或TIMESTAMP_LTZ(3),但该列还应标记为ROWTIME。
在代码中键入此行

sourceTable.printSchema();

并检查结果。该列应标记为ROWTIME,如下所示。

(
  `deviceId` STRING,
  `dataStart` BIGINT,
  `recordCount` INT,
  `time_Insert` BIGINT,
  `time_Insert_ts` TIMESTAMP(3) *ROWTIME*
)

你可以在下面找到我的样品。

Table tableCpuDataCalculatedTemp = tableEnv.fromDataStream(streamCPUDataCalculated, Schema.newBuilder()
                        .column("deviceId", DataTypes.STRING())
                        .column("dataStart", DataTypes.BIGINT())
                        .column("recordCount", DataTypes.INT())
                        .column("time_Insert", DataTypes.BIGINT())
                        .column("time_Insert_ts", DataTypes.TIMESTAMP(3))
                        .watermark("time_Insert_ts", "time_Insert_ts")
                        .build());

水印方法使其成为ROWTIME

相关问题