我们有一个将状态(valuestate和liststate)与ttl(statettlconfig)结合使用的拓扑,因为我们不能使用计时器(我们每天会生成数亿个计时器,而且它确实可以扩展:生成保存点/检查点需要数小时,甚至在运行时可能会卡住)。
但是,我们需要根据一些传入事件和其他逻辑的类型在运行时更新ttl的值。使用newstatettlconfig(和更新的ttl时间)重新创建一个新状态,并将值从“old”复制到“new”中,这样可以吗 processElement1()
以及 processElement2()
a的方法 CoProcessFunction
(而不是一次 open()
就像我们平常一样?
我猜“旧”州应该是垃圾收集(?)。
这个解决方案可以扩展吗?表现如何?产生任何问题?有什么不好的吗?
2条答案
按热度按时间byqmnocz1#
我猜“旧”州应该是垃圾收集(?)。
从flink文档中清除过期状态。
默认情况下,过期值在读取时被显式删除,例如valuestate#value,如果配置的state后端支持,则定期在后台进行垃圾收集。可以在statettlconfig中禁用后台清理:
或在完整快照后执行清理:
您可以根据文档随时更改ttl。但是,您必须重新启动查询(它不是在运行时):
对于现有作业,可以在statettlconfig中随时激活或停用此清理策略,例如从保存点重新启动后。
但是为什么你看不到rocksdb上的计时器,就像大卫在参考答案上说的那样?
uoifb46i2#
我认为您的方法在某种程度上可以处理运行时的状态重建,但它是脆弱的。我可以看到,问题是旧的状态元信息可能会在某个地方停留,具体取决于后端实现。
对于heap(fs)后端,检查点/保存点最终将没有过期旧状态的记录,但是当作业运行时,元信息可能会在内存中停留。如果作业重新启动,它将消失。
对于rocksdb来说,旧状态的列族可以保留。此外,背景清理仅在压缩期间运行。如果表太小,比如内存中的那个部分,这个部分(甚至可能是磁盘上的一点)会延迟。如果对完整快照的清除处于活动状态(不适用于增量检查点),则它将在重新启动后消失。
总而言之,这取决于创建新状态和从保存点/检查点重新启动作业的频率。
我创建了一个票证来记录什么可以在ttl配置中更改以及何时更改,所以请查看问题中的一些详细信息。