apache flink cep在侦听rmqsource之前初始化输入数据流

xuo3flqw  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(311)

我正在使用Flink1.2CEP检测设备中丢失的心跳事件。我从rabbitmq源中读取心跳事件,并使用以下模式通过超时函数检测由序列号键控的设备丢失的心跳。
此模式流适用于设备至少发送一次心跳的情况。但我还需要处理这样一个用例:检测一个设备的心跳缺失,而这个设备在应用程序启动后甚至没有启动一个心跳。
为此,我需要初始化输入心跳流与所有设备初始化心跳事件。如果我初始化流,这将照顾设备没有收到它的第一个心跳也超时,并提出了警报。
在从rmqsource函数监听之前,如何用所有设备的init heart beat数据初始化数据流?

//Reading heart beat of device from RabbitMQ queue
DataStream<HeartBeatEvent> heartBeatStream= 
    env.addSource(rmqSource).assignTimestampsAndWatermarks(new 
           IngestionTimeExtractor<String>());                                   

//Pattern to detect missing heartbeat
Pattern<HeartBeanEvent, ?> heartBeatEventPattern = Pattern.< 
          HeartBeanEvent >begin("first")
            .subtype(HeartBeanEvent.class)
            .next("second")
            .subtype(HeartBeanEvent.class)
            .within(Time.seconds(360));

DataStream<Either< HeartBeanEvent, String>> result = 
     CEP.pattern(heartBeatStream.keyBy(serialNum), 
                 heartBeatEventPattern).
  select(new PatternTimeoutFunction< HeartBeanEvent, HeartBeanEvent >() {
            public HeartBeanEvent timeout(Map<String, HeartBeanEvent > 
                   pattern, long timeoutTimestamp) throws Exception {
   System.out.println("Missing heart beat:" + 
           pattern.get("first").getSerialNum() + ":" + 
           pattern.get("first").getEventTime());  
           return pattern.get("first");
            }
    },new PatternSelectFunction< HeartBeanEvent, String>() {
            public String select(Map<String, HeartBeanEvent > pattern) {                     
                return null;
            }
        });

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题