cep测量事件时间延迟

owfi6suc  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(364)

我用flink cep实现了一个模式,它匹配三个事件,例如 A->B->C . 在我定义了我的模式之后,我生成了一个 PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern); 用一个 PatternSelectFunction 以至于 patternStream.select(new MyPatternSelectFunction()).print(); 这就像一个魅力,但我对所有匹配事件的事件时间感兴趣。我知道传统的flink流api提供了丰富的函数,允许您注册flink的内部延迟跟踪器,如本文所述。我也看到了flink 1.8的新功能 RichPatternSelectFunction 已添加。但不幸的是,我不能用flink cep设置flink1.8。
最后,有没有办法得到所有匹配事件的事件时间?

4szc88ey

4szc88ey1#

你不需要丰富的函数来使用flink的延迟跟踪。您只需要通过设置 latencyTrackingInterval 在flink配置或executionconfig中设置为正数,例如。,

env.getConfig().setLatencyTrackingInterval(1000);

然后您可以在度量解决方案中观察结果,或者通过restapi(flinkwebui中不报告延迟度量)。
文档
更新:
延迟统计信息是作业度量,位于

http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics

延迟度量值可以从

http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>

这些指标的名称如下

latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>

其中,ids标识作业图中的源节点和操作员节点,在它们之间测量延迟。
例如,我可以通过以下请求确定当前正在运行的作业中源和某个接收器之间的95%延迟:

http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95

或者,在事件进入作业的cep部分之前,可以使用processfunction向事件添加处理时间戳,然后使用另一个processfunction来测量经过的时间。

相关问题