我正在为使用match\u recognize的flinksql语句编写一个单元测试。我是这样设置测试数据的
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
我有两个问题,
如何指定事件\时间作为水印字段(表示行时间)
不太重要的是,给创建的表起一个有意义的名字?
flink版本:1.11
1条答案
按热度按时间9avjhtql1#
您遇到了表api的一个当前限制:无法将水印和行时属性与
forValues
方法;你需要一个连接器。有两种解决方法:1.使用
csv
你和你的VALUES
,如本例所示。2.使用内置的datagen连接器。既然您正在为cep准备一个单元测试,我想您需要对生成的数据进行某种程度的控制,所以这可能不是一个可行的选择。我想我还是提一下。
注意:建议使用SQLDDL语法从Flink1.10创建表。这将使您要做的两件事(即定义水印和命名表)更加简单:
水印声明为计算列,并且可以选择使用多种水印策略。有关详细信息,请查看此文档页。