我有一个有趣的用例,我想用flink测试。我有一股新的 Message
哪一个是 PASS
或者 FAIL
. 如果消息的类型是 FAIL
,我有一个下游 ProcessFunction
它拯救了 Message
状态,然后发送 pause
所有依赖于此的命令。当我收到 PASS
与 FAIL
我之前收到(按消息id键入),我发送 resume
我刚才暂停的所有命令。
现在我计划使用statettl使存储的 FAIL
状态并在特定超时后恢复所有操作,即使我没有收到 PASS
具有相同消息id的消息。这可以单独使用flink完成,还是需要一些外部计时器将超时消息发送到我的程序?
我有这样的想法让它在Flink工作:
对于每个 Message
,添加时间戳并将其传递给等待 current_ts - timestamp == timeout
在发送之前,恢复模块暂停的所有操作。有没有更好的办法或者你们觉得这样可以吗?
1条答案
按热度按时间pbwdgjma1#
似乎使用计时器使状态过期(通过在ontimer方法中调用state.clear())比使用state ttl更简单。同样的ontimer方法也可以安排事情同时恢复。