flink什么时候可以支持涉及先前事件字段的模式匹配?

iibxawm4  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(350)

如果能够根据事件的字段值来匹配事件,这将是一件非常好的事情,这超出了当前从匹配不同标准的事件中创建模式的能力。例如,在https://flink.apache.org/news/2016/04/06/cep-monitoring.html 我们可以做到:

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

不过,如果能利用以下能力创建模式,那就太好了: .where(second_evt->evt.getTemperature() == first_evt->evt.getTemperature()

gcxthw6b

gcxthw6b1#

如果要比较不同事件中字段的值,可以在 flatSelect 方法,只需使用一个非常简单的模式,而不需要任何 where 表达:
创建图案:

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .within(Time.seconds(10));

将模式应用于数据流:

PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
    inputEventStream.keyBy("rackID"),
    warningPattern);

检查值并通过生成新的复杂事件 flatSelect 方法:

DataStream<TemperatureWarning> warnings = tempPatternStream.flatSelect(
    (Map<String, MonitoringEvent> pattern, Collector<TemperatureAlert> out) -> {
        TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
        TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");

        if (first.getTemperature() <= second.getTemperature()) {
            out.collect(new TemperatureWarning(
                first.getRackID(), 
                (first.getTemperature() + second.getTemperature()) / 2));
        }
    });

相关问题