java在flinksqlwindows中使用事件时间延迟

im9ewurl  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(488)

我有一个数据流,通常是无序的。我将数据集定义为:

DataStream<ApplicationMetric> metrics = env
  .addSource(new FlinkKinesisConsumer<>("applicationMetric", new SimpleStringSchema(), consumerConfig))
  .map(mapper)
  .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<>(Time.seconds(30)));

并打印为:

Table table = bsTableEnv.fromDataStream(dataset, "createdAt, name, duration, rowtime.rowtime");
Table t1 = bsTableEnv.sqlQuery("SELECT CAST((createdAt/1000) as TIMESTAMP) as ts, rowtime, name, duration " + table);
bsTableEnv.toAppendStream(t1, Row.class).print();

那么,如果我尝试在带有时间窗口(翻滚等)的sql查询中使用它,这是如何工作的呢?我想我对这些概念的理解可能是根本错误的。但是我假设延迟事件将被删除,并且我可以使用我的rowtime作为滚动窗口属性?

e7arh2l6

e7arh2l61#

您可以控制在datastream api中处理延迟事件的方式。默认情况下,所有大于30秒的事件都会被删除到代码中。这30秒会增加端到端的延迟,所以输出会被延迟,直到flink看到了30秒的数据并考虑了所有的延迟事件。您还可以将延迟事件存储在单独的输出或重新触发计算中。
不幸的是,表api/sql不支持这种配置。延迟事件总是会被删除,但对于您的用例来说,这似乎已经足够了。

相关问题