flinksql单元测试:如何分配水印?

7vhp5slm  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(459)

我正在为使用match\u recognize的flinksql语句编写一个单元测试。我是这样设置测试数据的

Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
  DataTypes.FIELD("foobar", DataTypes.STRING()),
  ....
  ),
  row(...),
  row(...)
);

我有两个问题,
如何指定事件\时间作为水印字段(表示行时间)
不太重要的是,给创建的表起一个有意义的名字?
flink版本:1.11

9avjhtql

9avjhtql1#

您遇到了表api的一个当前限制:无法将水印和行时属性与 forValues 方法;你需要一个连接器。有两种解决方法:
1.使用 csv 你和你的 VALUES ,如本例所示。
2.使用内置的datagen连接器。既然您正在为cep准备一个单元测试,我想您需要对生成的数据进行某种程度的控制,所以这可能不是一个可行的选择。我想我还是提一下。
注意:建议使用SQLDDL语法从Flink1.10创建表。这将使您要做的两件事(即定义水印和命名表)更加简单:

tEnv.executeSql("CREATE TABLE table_name (\n" +
                "             event_time TIMESTAMP(3),\n" +
                "             foobar STRING \n" +
                "             WATERMARK FOR event_time AS event_time\n" +
                ") WITH (...)"
);

Table data = tEnv.from("table_name");

水印声明为计算列,并且可以选择使用多种水印策略。有关详细信息,请查看此文档页。

相关问题