flink sql:连接表的内存不足

pdsfdshx  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(478)

我经常更新mysql表。我想为过去20秒内更新的每个id拍一张快照,并将值写入redis。我使用binlog作为流输入,并将数据流转换为flink表。我运行以下sql。

SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
   SELECT id, MAX(ts)
   FROM my_tbl
   GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)

据我所知,表join会造成过大的状态大小,我将streamqueryconfig设置为

qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));

我运行了一天的任务,得到了内存不足的错误。我怎样才能解决这个问题?

8nuwlpux

8nuwlpux1#

您还可以使用时间窗口连接来解决这个问题,而不是使用配置了空闲状态保留时间的常规连接。
下面的查询应该可以做到这一点。

SELECT id, ts, val
FROM my_tbl m1,
     (SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
      FROM my_tbl
      GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
      m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime

窗口连接 predicate ( BETWEEN )确保自动清除状态。因为您使用的处理时间并不精确,所以我增加了5秒的空闲时间。

相关问题