关于flink中状态的ttl配置

b1zrtrql  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(584)

假设我有一个描述符的配置,并从这里执行操作:

ValueStateDescriptor<Event> descriptor = ...;

StateTtlConfig ttlConfigOneHourAndReturnExpire = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

descriptor.enableTimeToLive(ttlConfigOneHourAndReturnExpire);

/*after one hour when the state is expired*/
Event e = state.value(); (step 1 and 2)
e.count = e.count + 1; (step 3)
value.update(e); (step 4)

这是否意味着在1小时后,当状态已被否决时,事情将按以下顺序发生:
返回状态为“已弃用”的记录的上一个状态。
读取之后,记录的先前状态将被清除。
在上一个状态被传递和清理之后更新对象(在read中)。
在这种情况下,更新状态意味着再次创建状态,因为上一个状态已经被删除,并且此值将需要一个多小时,或者状态将在此时而不是在点1被清除,并且对象将不包括更新,并且它将在到达时存储在状态中?
希望我能自己解释一下,因为文件我不清楚。
从我需要在一天发生变化时清理状态开始,使用ttl无法做到这一点,我想在每小时后清理一次状态,但在删除之前获取状态,更新当前值,然后再创建一个小时的状态,但在丢失之前始终保持前一个状态。
希望这是有意义的,并有可能在某种程度上这样做。谨致问候!

0s7z1bwu

0s7z1bwu1#

如果需要每小时操纵一次状态,那么创建一个自定义 ProcessFunction 然后用定时器触发那个动作。

相关问题