如何在flink cep中指定状态应保持的时间

rmbxnbpk  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(447)

让我来解释一个我需要处理的情况。假设有三个设备a、b、c正在将日志发送到flink cep进行处理。让我们假设模式为a接5分钟b接5分钟c。让我们假设一个场景,其中b设备关闭,并在50分钟后发送日志。所以在这种情况下,所有事件都将被删除。我只是想知道flink中是否有支持将状态保持在特定的定义时间间隔内(假设在我的例子中是1天,这意味着a和c日志都将存储1天,如果不匹配,日志将被丢弃)。请从cep的Angular 提出可行性建议。

lfapxunr

lfapxunr1#

没有比这更具体的了 until 或者 within 据我所知,这些都是用来指定t的。这取决于您的具体设置,但如果您将所有数据放在一个主题中,可能很难防止设备长时间停机。您可以尝试修改水印生成逻辑,但这意味着它通常会延迟输出。
在这种情况下,您可以考虑使用 ProcessFunction 使用更灵活的自定义逻辑,允许您以更好的粒度处理状态。
编辑:
所以,基本上你需要创建一个状态来保存部分匹配,这取决于它可能是一个 ListState 或者 ValueState 然后简单地把你找到的部分匹配放在那里。所以,如果你想要a->b->c,那么如果你有a,你会检查它并将其放入状态,如果你收到b,你可以检查时间戳并将其附加到状态,最后如果你有c,你可以发出整个匹配并清除状态。
如果在那里设置statettl,这将表明状态在一段时间内未读/写之后将被自动清除。
还请注意,如果模式不是很复杂,这是有意义的,否则编写逻辑代码很快就会成为一场噩梦。

相关问题