Flink SQL
,如何在中按事件时间获取第一条记录和最后一条记录 Kafka
数据流并将其存储到数据库(例如 MySQL
)?
另外,如果在 Kafka
数据流来了,我们应该更新中的记录 MySQL
.
假设,记录在 Kafka
具体如下:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
{'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
{'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}
由 Flink SQL
,我预期的结果如下:
{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
{'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
{'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
{'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}
我们把这些记录存储在 MySQL
,假设结果如下,
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 16:10:00 | 18 |
如果在 Kafka
就要来了,
{'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}
我希望我们能更新 are
在 MySQL
,更新结果如下:
| word | first_appearance_time | first_appearance_page | last_appearance_time | last_appearance_page |
| hello | 2020-12-04 16:00:00 | 5 | 2020-12-04 16:15:00 | 20 |
| are | 2020-12-04 16:00:00 | 12 | 2020-12-04 17:18:00 | 30 |
我在第二步和第五步遇到了一些麻烦,有人能给我一些建议吗?
2条答案
按热度按时间ibrsph3r1#
当我在flinksql中运行这个时,我得到一个表异常-
[错误]无法执行sql语句。原因:org.apache.flink.table.api.tableexception:只能以升序模式对窗口排序。
我们不能在flink 1.11.3中的over()中使用desc吗?
谢谢,
kulphzqa2#
按行时排序的重复数据消除是最简单的方法,但1.12支持这种方法。https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication
此查询也应在1.11中使用,但没有优化为重复数据消除,而是优化为效率较低的topn运算符。