我写了一篇flink cep文章,检查状态模式(由 id
)与 Relaxed Contiguity
( followedBy
). 这样做的目的是,如果某个特定的状态在指定的时间内没有在第一个状态之后到达,就会发出警报。
这是可行的,但是如果没有更多的消息发送到该流,则不会触发警报。但只有当一个带有随机状态的消息到达时,才会触发这个片段。
那么,当具有下一个序列的消息没有在某个时间到达时,如何使它触发警报(即使没有消息到达此流)?
Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
.where(new SimpleCondition<Transaction>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_1);
}
})
.followedBy("end")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_2);
return amlveri;
}
}).within(Time.seconds(15));
PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
TypeInformation.of(Alert.class)) {};
SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
new PatternFlatTimeoutFunction<Transaction, Alert>() {
@Override
public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
throws Exception {
Transaction failedTrans = values.get("start").get(0);
arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
}
}, new PatternFlatSelectFunction<Transaction, Alert>() {
@Override
public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
throws Exception {
// do not do anything
}
});
select.getSideOutput(timedOutPartialMatchesTag).print();
1条答案
按热度按时间m4pnthwp1#
如果使用的是事件时间,则
within()
方法正在等待水印触发事件时间计时器。但是,如果没有事件到达,水印将不会前进(假设您正在使用boundedAutoFordernessTimestampExtractor之类的工具来生成水印)。如果您需要在没有事件到达时检测时间的流逝,那么有必要使用处理时间。你可以设置
TimeCharacteristic
或者您可以实现一个水印生成器,该生成器使用处理时间计时器在缺少事件的情况下人为地推进水印。