flinksql,如何在kafka数据流中通过eventtime获取第一条记录和最后一条记录,并将其存储到数据库(如gp、mysql)中?

mmvthczy  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(675)

Flink SQL ,如何在中按事件时间获取第一条记录和最后一条记录 Kafka 数据流并将其存储到数据库(例如 MySQL )?
另外,如果在 Kafka 数据流来了,我们应该更新中的记录 MySQL .
假设,记录在 Kafka 具体如下:

{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 10}
    {'word': 'hello', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 15}
    {'word': 'are', 'eventtime': '2020-12-04 16:05:00', 'appear_page': 15}      
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00''appear_page': 18}

Flink SQL ,我预期的结果如下:

{'word': 'hello', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 5}
    {'word': 'hello', 'eventtime': '2020-12-04 16:15:00', 'appear_page': 20}
    {'word': 'are', 'eventtime': '2020-12-04 16:00:00', 'appear_page': 12}
    {'word': 'are', 'eventtime': '2020-12-04 16:10:00', 'appear_page': 18}

我们把这些记录存储在 MySQL ,假设结果如下,

|    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 16:10:00     |             18             |

如果在 Kafka 就要来了,

{'word': 'are', 'eventtime': '2020-12-04 17:18:00', 'appear_page': 30}

我希望我们能更新 areMySQL ,更新结果如下:

|    word    |    first_appearance_time    |    first_appearance_page    |    last_appearance_time    |    last_appearance_page    |
    |    hello   |    2020-12-04 16:00:00      |            5                |    2020-12-04 16:15:00     |             20             |
    |    are     |    2020-12-04 16:00:00      |            12               |    2020-12-04 17:18:00     |             30             |

我在第二步和第五步遇到了一些麻烦,有人能给我一些建议吗?

ibrsph3r

ibrsph3r1#

当我在flinksql中运行这个时,我得到一个表异常-

SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum

[错误]无法执行sql语句。原因:org.apache.flink.table.api.tableexception:只能以升序模式对窗口排序。
我们不能在flink 1.11.3中的over()中使用desc吗?
谢谢,

kulphzqa

kulphzqa2#

按行时排序的重复数据消除是最简单的方法,但1.12支持这种方法。https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication

CREATE TABLE src (
  word STRING,
  eventtime TIMESTAMP(3),
  appear_page INT,
  WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka',
  ...
);

-- get last row by word key
SELECT word, eventtime, appear_page
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY eventtime DESC) AS rownum
  FROM src
) WHERE rownum = 1;

此查询也应在1.11中使用,但没有优化为重复数据消除,而是优化为效率较低的topn运算符。

相关问题