我有一个场景,如果第二个事件没有在x秒内跟随第一个事件,我必须更改状态。例如,用户在100分钟内没有注销,则认为他处于无效状态。如何使用当前的模式操作来设计它?
q8l4jmvw1#
由于这已经实现了,我想为那些来这里寻找答案的人回答这个问题。从flink 1.0.0开始,这可以通过处理timedout模式来实现,例如,如果您的cep模式是这样的:示例部分来自flink网站(1.2和1.3之间有一些重大变化,请相应地调整代码,此答案集中在1.3)模式描述:-获取类型为“error”的第一个事件,然后在10秒内获取类型为“critical”的第二个事件
Pattern<Event, ?> pattern = Pattern.<Event>begin("start") .next("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("error"); } }).followedBy("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("critical"); } }).within(Time.seconds(10)); PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern) DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() { @Override public String timeout(Map<String, List<Event>> map, long l) throws Exception { return map.toString() +" @ "+ l; } }, new PatternSelectFunction<Event, String>() { @Override public String select(Map<String, List<Event>> map) throws Exception { return map.toString(); } });
对于这种情况,如果用户在100分钟后仍然没有注销,那么由于相应的事件不会到达,它将导致模式为timedout,并且部分事件(起始事件)将在patterntimeoutfunction中捕获。
rta7y2nd2#
目前这是不可能做到的。解决方案是有一个超时处理程序,每当事件序列因为超出定义的时间窗口而被丢弃时,它就会被触发。跟踪超时处理程序实现的jira问题已经存在。
2条答案
按热度按时间q8l4jmvw1#
由于这已经实现了,我想为那些来这里寻找答案的人回答这个问题。
从flink 1.0.0开始,这可以通过处理timedout模式来实现,例如,如果您的cep模式是这样的:
示例部分来自flink网站(1.2和1.3之间有一些重大变化,请相应地调整代码,此答案集中在1.3)
模式描述:-获取类型为“error”的第一个事件,然后在10秒内获取类型为“critical”的第二个事件
对于这种情况,如果用户在100分钟后仍然没有注销,那么由于相应的事件不会到达,它将导致模式为timedout,并且部分事件(起始事件)将在patterntimeoutfunction中捕获。
rta7y2nd2#
目前这是不可能做到的。解决方案是有一个超时处理程序,每当事件序列因为超出定义的时间窗口而被丢弃时,它就会被触发。跟踪超时处理程序实现的jira问题已经存在。