假设我有一个下面的模型
public class Model {
private int ts;
private String dataField1;
private String dataField2;
private String dataField3;
// possibly many more, they might change in the future
}
我有一些flink代码可以解析 DataStream[String]
进入 DataStream[Model]
. 现在,我想使用 FlinkSQL
. 假设我想有这样的疑问
SELECT COUNT(DISTINCT dataField1) FROM [tableName]
GROUP BY dataField2, TUMBLE(ts, INTERVAL '15' MINUTE)
当然,这不是Flink所要求的 ts
成为 TIMESTAMP
目前它是一个 BIGINT
. 因此,我们必须以某种方式指示flink在这里使用事件时间(水印提取器已经是现有系统的一部分) DataStream
). 我想到了以下几点
Table inputTable = tableEnv.fromDataStream(inputStream, "ts.rowtime, dataField1, dataField2, ..."
这是正常工作,但我真的不想在这里迭代每个pojo字段只是为了将timestamp标记为 rowtime
. 我认为有可能添加另一个步骤+引入一个新的pojo来转换我的 Model
到
public class TableModel {
private Timestamp ts;
private String dataField1;
private String dataField2;
private String dataField3;
// ...
}
但我也不喜欢这样。。。有没有更干净的转换方法 BIGINT
进入 TIMESTAMP
没有这样的样板?
暂无答案!
目前还没有任何答案,快来回答吧!