我用flink cep实现了一个模式,它匹配三个事件,例如 A->B->C
. 在我定义了我的模式之后,我生成了一个 PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
用一个 PatternSelectFunction
以至于 patternStream.select(new MyPatternSelectFunction()).print();
这就像一个魅力,但我对所有匹配事件的事件时间感兴趣。我知道传统的flink流api提供了丰富的函数,允许您注册flink的内部延迟跟踪器,如本文所述。我也看到了flink 1.8的新功能 RichPatternSelectFunction
已添加。但不幸的是,我不能用flink cep设置flink1.8。
最后,有没有办法得到所有匹配事件的事件时间?
1条答案
按热度按时间4szc88ey1#
你不需要丰富的函数来使用flink的延迟跟踪。您只需要通过设置
latencyTrackingInterval
在flink配置或executionconfig中设置为正数,例如。,然后您可以在度量解决方案中观察结果,或者通过restapi(flinkwebui中不报告延迟度量)。
文档
更新:
延迟统计信息是作业度量,位于
延迟度量值可以从
这些指标的名称如下
其中,ids标识作业图中的源节点和操作员节点,在它们之间测量延迟。
例如,我可以通过以下请求确定当前正在运行的作业中源和某个接收器之间的95%延迟:
或者,在事件进入作业的cep部分之前,可以使用processfunction向事件添加处理时间戳,然后使用另一个processfunction来测量经过的时间。