cep库检测语言模式

ctrmrzij  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(384)

如何使用flink-cep库检测语言模式?
例如:假设设备有一些问题,所以它会连续地发布诸如on、off之类的值。如果问题持续30分钟,如何使用cep检测模式。我在下面提到的一些示例数据。

OFF     16/08/18 11:38
ON      16/08/18 11:38
OFF     16/08/18 11:38
ON      16/08/18 11:37
OFF     16/08/18 11:37
ON      16/08/18 11:36
OFF     16/08/18 11:36
OFF     16/08/18 11:36
ON      16/08/18 11:36
OFF     16/08/18 11:35
ON      16/08/18 11:35
ON      16/08/18 11:34
OFF     16/08/18 11:34
ilmyapht

ilmyapht1#

如果您的流是按时间排序的(只需要为每个单独的设备对流进行排序),那么您可以轻松地转换流以使分析更容易。一 RichFlatMapFunction 这样会将开关事件序列转换为状态更改事件序列:

static class DetectChanges extends RichFlatMapFunction<String, String> {
    private transient ValueState<String> previousState;

    @Override
    public void open(Configuration parameters) throws Exception {
        previousState = getRuntimeContext().getState(new ValueStateDescriptor<>("previousState", String.class));
    }

    @Override
    public void flatMap(String onOrOff, Collector<String> out) throws Exception {

        if (previousState.value() != onOrOff) {
            out.collect("CHANGE");
            previousState.update(onOrOff);
        }
    }
}

现在问题已经简化为确定流在一段时间间隔内是否有一些更改事件。这可以很容易地完成滑动窗口,或者你可以使用cep如果你喜欢。
你也可以完全用cep来做。从概念上讲,您可以按以下方式处理此问题:
定义一个与on+off匹配的单独模式+
然后定义一个模式组,每当在某个时间间隔内出现n次时,该模式组就与该开/关模式匹配

相关问题