各位,我想在streamtable环境中使用flink时间窗口。
我以前对来自Kafka主题的数据流使用过timewindow(time.seconds())函数。对于外部问题,我将此数据流转换为datatable,并使用sqlquery()应用sql查询。
我想用sql进行x次窗口聚合,然后将其发送到另一个kafka主题
数据源:
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
先前聚合的示例:
val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))
当前数据表:
val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)
在这一部分中,应该有一个查询,每次进行一次聚合
val result = tableEnv.sqlQuery(
s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)
或多或少我的聚合将计数(y)超过(x分区)谢谢!
1条答案
按热度按时间b1zrtrql1#
ververica对flinksql的培训将对您有所帮助。在“使用sql查询动态表”一节中,包括一些练习/示例,这些练习/示例仅涉及此类查询。
您必须为每个事件建立计时信息的源,它可以是处理时间或事件时间,之后查询对应于
stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))
会是这样的:有关如何使用时间属性的详细信息,请参见时间属性简介。
有关使用flinksql进行窗口操作的详细文档,请参阅组窗口上的文档。