我想基于具有相同标识符的两个事件来检测两个事件是否在定义的时间范围内发生。例如a DoorEvent
看起来像这样:
<doorevent>
<door>
<id>1</id>
<status>open</status>
</door>
<timestamp>12345679</timestamp>
</doorevent>
<doorevent>
<door>
<id>1</id>
<status>close</status>
</door>
<timestamp>23456790</timestamp>
</doorevent>
我的 DoorEvent
下面示例中的java类具有相同的结构。
我要检测到id为1的门在打开后5分钟内关闭。为此,我尝试使用apache-flink-cep库。传入流包含来自20个门的所有打开和关闭消息。
Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("open")){
// save state of door as open
return true;
}
return false;
}
}
)
.followedByAny("door_close").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("close")){
// check if close is of previously opened door
return true;
}
return false;
}
}
)
.within(Time.minutes(5));
如何将门1的状态保存为 door_open
所以在 door_close
我知道1号门是关着的,不是别的门吗?
1条答案
按热度按时间mepcadol1#
如果你有Flink1.3.0和更高版本,那么你想做什么就很简单了
您的模式如下所示:
因此,基本上,您可以使用iterativeconditions并获取第一个匹配的模式的上下文,并在列表上迭代,同时比较所需的模式,并根据需要继续。
迭代条件很昂贵,应该相应地进行处理
有关条件的更多信息,请访问flink-conditions