各位,我有一个Kafka主题的来源,我分组一分钟的窗口。我想在那个窗口中做的是用window函数创建新的列,比如我想使用的sql
(除以)的和
计数(用户)超过(分区)
上的行数()
我可以为这些操作使用数据流函数吗?或者
如何操作kafka数据将其转换为datatable并使用sqlquery?
目的地是Kafka的另一个主题。
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
我试过这么做
val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)
但是我得到了下面的错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
试验数据
1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2
查询示例
SELECT
user, product, amount,
COUNT(user) OVER(PARTITION BY product) AS count_product
FROM table;
预期业绩
1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1
1条答案
按热度按时间anhgbhbe1#
您需要将字符串解析为字段,然后重命名它们。
我不确定如何在flinksql中实现所需的窗口函数。或者,它可以用简单的flink实现,如下所示: