当平行度>1时,flink cep无法检测模式

jm81lzqq  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(405)

我有一个简单的程序,使用flink cep库从日志记录文件中检测多个失败的登录。我的应用程序使用事件时间,我正在登录的“用户”上执行keyby。
当我将streamexecutionenvironment parallelism设置为1时,程序运行良好。当并行性是其他任何东西时,它就失败了。我不明白为什么。
我可以看到与某个特定用户相关的所有记录都指向同一个线程,所以为什么会出现这个问题。还可以看到记录在很多情况下不是按事件时间顺序排列的(不确定这是否是个问题),但是我在api中找不到任何东西可以让我在一个窗口中按事件时间对记录进行排序。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.setParallelism(1); //tried with 1 & 4
.....    
DataStream<LogEvent> inputLogEventStream = env
    .readFile(format, FILEPATH, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
    .map(new MapToLogEvents())              
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.seconds(0)) {
            public long extractTimestamp(LogEvent element) {
                    return element.getTimeLong();
            }
    })
    .keyBy(new KeySelector<LogEvent, String>() {
            public String getKey(LogEvent le) throws Exception {
                    return le.getUser();
            }
    });
inputLogEventStream.print();

Pattern<LogEvent, ?> mflPattern = Pattern.<LogEvent> begin("mfl")
    .subtype(LogEvent.class).where(
            new SimpleCondition<LogEvent>() {
                    public boolean filter(LogEvent logEvent) {
                            if (logEvent.getResult().equalsIgnoreCase("failed")) { return true; }
                            return false;
                    }
    })
    .timesOrMore(3).within(Time.seconds(60));

PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);

DataStream<Threat> outputMflStream = mflPatternStream.select(
    new PatternSelectFunction<LogEvent, Threat>() {
            public Threat select(Map<String, List<LogEvent>> logEventsMap) {
                    return new Threat("MULTIPLE FAILED LOGINS detected!");
            }
    });
outputMflStream.print();

以下是打印输出:
并行度=1(成功检测到模式)

04/03/2018 12:03:53 Source: Custom File Source(1/1) switched to RUNNING 
04/03/2018 12:03:53 SelectCepOperator -> Sink: Unnamed(1/1) switched to RUNNING 
04/03/2018 12:03:53 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(1/1) switched to RUNNING 
04/03/2018 12:03:53 Sink: Unnamed(1/1) switched to RUNNING 
LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]

**THREAT**==> MULTIPLE FAILED LOGINS detected!

平行度=4(检测模式失败)

04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(3/4) switched to RUNNING 
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(2/4) switched to RUNNING 
04/03/2018 12:05:33 Sink: Unnamed(2/4) switched to RUNNING 
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(2/4) switched to RUNNING 
04/03/2018 12:05:33 Sink: Unnamed(3/4) switched to RUNNING 
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(3/4) switched to RUNNING 
2> LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
3> LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
3> LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
3> LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
rfbsl7qr

rfbsl7qr1#

我认为这里发生的事情是不同的分区得到这些事件,当使用cep时,使用.keyby()是非常重要的。
你的代码

PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);

我想应该是

PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream.keyBy("eventCategory","user"), mflPattern);

你可能想看看https://cwiki.apache.org/confluence/display/flink/streams+and+operations+on+streams

相关问题