flink在状态过期时触发

xfb7svmp  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(380)

我有一个有趣的用例,我想用flink测试。我有一股新的 Message 哪一个是 PASS 或者 FAIL . 如果消息的类型是 FAIL ,我有一个下游 ProcessFunction 它拯救了 Message 状态,然后发送 pause 所有依赖于此的命令。当我收到 PASSFAIL 我之前收到(按消息id键入),我发送 resume 我刚才暂停的所有命令。
现在我计划使用statettl使存储的 FAIL 状态并在特定超时后恢复所有操作,即使我没有收到 PASS 具有相同消息id的消息。这可以单独使用flink完成,还是需要一些外部计时器将超时消息发送到我的程序?
我有这样的想法让它在Flink工作:
对于每个 Message ,添加时间戳并将其传递给等待 current_ts - timestamp == timeout 在发送之前,恢复模块暂停的所有操作。有没有更好的办法或者你们觉得这样可以吗?

pbwdgjma

pbwdgjma1#

似乎使用计时器使状态过期(通过在ontimer方法中调用state.clear())比使用state ttl更简单。同样的ontimer方法也可以安排事情同时恢复。

相关问题