如何在ApacheFlink的streamtableenvironment中实现timewindow()?

xnifntxz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(393)

各位,我想在streamtable环境中使用flink时间窗口。
我以前对来自Kafka主题的数据流使用过timewindow(time.seconds())函数。对于外部问题,我将此数据流转换为datatable,并使用sqlquery()应用sql查询。
我想用sql进行x次窗口聚合,然后将其发送到另一个kafka主题
数据源:

val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

先前聚合的示例:

val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))

当前数据表:

val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)

在这一部分中,应该有一个查询,每次进行一次聚合

val result = tableEnv.sqlQuery(
          s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)

或多或少我的聚合将计数(y)超过(x分区)谢谢!

b1zrtrql

b1zrtrql1#

ververica对flinksql的培训将对您有所帮助。在“使用sql查询动态表”一节中,包括一些练习/示例,这些练习/示例仅涉及此类查询。
您必须为每个事件建立计时信息的源,它可以是处理时间或事件时间,之后查询对应于 stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5)) 会是这样的:

SELECT
  x,
  TUMBLE_END(timestamp, INTERVAL '5' SECOND) AS t,
  COUNT(*) AS cnt
FROM Events
GROUP BY
  x, TUMBLE(timestamp, INTERVAL '5' SECOND);

有关如何使用时间属性的详细信息,请参见时间属性简介。
有关使用flinksql进行窗口操作的详细文档,请参阅组窗口上的文档。

相关问题