让我来解释一个我需要处理的情况。假设有三个设备a、b、c正在将日志发送到flink cep进行处理。让我们假设模式为a接5分钟b接5分钟c。让我们假设一个场景,其中b设备关闭,并在50分钟后发送日志。所以在这种情况下,所有事件都将被删除。我只是想知道flink中是否有支持将状态保持在特定的定义时间间隔内(假设在我的例子中是1天,这意味着a和c日志都将存储1天,如果不匹配,日志将被丢弃)。请从cep的Angular 提出可行性建议。
让我来解释一个我需要处理的情况。假设有三个设备a、b、c正在将日志发送到flink cep进行处理。让我们假设模式为a接5分钟b接5分钟c。让我们假设一个场景,其中b设备关闭,并在50分钟后发送日志。所以在这种情况下,所有事件都将被删除。我只是想知道flink中是否有支持将状态保持在特定的定义时间间隔内(假设在我的例子中是1天,这意味着a和c日志都将存储1天,如果不匹配,日志将被丢弃)。请从cep的Angular 提出可行性建议。
1条答案
按热度按时间lfapxunr1#
没有比这更具体的了
until
或者within
据我所知,这些都是用来指定t的。这取决于您的具体设置,但如果您将所有数据放在一个主题中,可能很难防止设备长时间停机。您可以尝试修改水印生成逻辑,但这意味着它通常会延迟输出。在这种情况下,您可以考虑使用
ProcessFunction
使用更灵活的自定义逻辑,允许您以更好的粒度处理状态。编辑:
所以,基本上你需要创建一个状态来保存部分匹配,这取决于它可能是一个
ListState
或者ValueState
然后简单地把你找到的部分匹配放在那里。所以,如果你想要a->b->c,那么如果你有a,你会检查它并将其放入状态,如果你收到b,你可以检查时间戳并将其附加到状态,最后如果你有c,你可以发出整个匹配并清除状态。如果在那里设置statettl,这将表明状态在一段时间内未读/写之后将被自动清除。
还请注意,如果模式不是很复杂,这是有意义的,否则编写逻辑代码很快就会成为一场噩梦。