如果一个序列没有在一段时间内到达,不管是否有其他消息到达,都会发出警报

cld4siwp  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(444)

我写了一篇flink cep文章,检查状态模式(由 id )与 Relaxed Contiguity ( followedBy ). 这样做的目的是,如果某个特定的状态在指定的时间内没有在第一个状态之后到达,就会发出警报。
这是可行的,但是如果没有更多的消息发送到该流,则不会触发警报。但只有当一个带有随机状态的消息到达时,才会触发这个片段。
那么,当具有下一个序列的消息没有在某个时间到达时,如何使它触发警报(即使没有消息到达此流)?

Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
            .where(new SimpleCondition<Transaction>() {

                private static final long serialVersionUID = 1L;

                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_1);
                }
            })
           .followedBy("end")
           .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_2);
                    return amlveri;
                }
            }).within(Time.seconds(15));

PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
            TypeInformation.of(Alert.class)) {};
    SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
            new PatternFlatTimeoutFunction<Transaction, Alert>() {

                @Override
                public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
                        throws Exception {
                    Transaction failedTrans = values.get("start").get(0);
                    arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
                }
            }, new PatternFlatSelectFunction<Transaction, Alert>() {

                @Override
                public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
                        throws Exception {
                       // do not do anything
                }
            });
    select.getSideOutput(timedOutPartialMatchesTag).print();
m4pnthwp

m4pnthwp1#

如果使用的是事件时间,则 within() 方法正在等待水印触发事件时间计时器。但是,如果没有事件到达,水印将不会前进(假设您正在使用boundedAutoFordernessTimestampExtractor之类的工具来生成水印)。
如果您需要在没有事件到达时检测时间的流逝,那么有必要使用处理时间。你可以设置 TimeCharacteristic 或者您可以实现一个水印生成器,该生成器使用处理时间计时器在缺少事件的情况下人为地推进水印。

相关问题