我经常更新mysql表。我想为过去20秒内更新的每个id拍一张快照,并将值写入redis。我使用binlog作为流输入,并将数据流转换为flink表。我运行以下sql。
SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
SELECT id, MAX(ts)
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)
据我所知,表join会造成过大的状态大小,我将streamqueryconfig设置为
qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));
我运行了一天的任务,得到了内存不足的错误。我怎样才能解决这个问题?
1条答案
按热度按时间8nuwlpux1#
您还可以使用时间窗口连接来解决这个问题,而不是使用配置了空闲状态保留时间的常规连接。
下面的查询应该可以做到这一点。
窗口连接 predicate (
BETWEEN
)确保自动清除状态。因为您使用的处理时间并不精确,所以我增加了5秒的空闲时间。