在apache flink cep中可以处理多个流吗?

yks3o0rb  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(480)

我的问题是,如果我们有两个原始事件流,即烟雾和温度,我们想通过对原始流应用操作符来找出是否发生了复杂事件,即火灾,我们能在flink中这样做吗?
我问这个问题是因为到目前为止我所看到的所有flink cep示例都只包含一个输入流。如果我错了,请纠正我。

vsnjm48y

vsnjm48y1#

简短回答-是的,您可以根据不同流源中的事件类型读取和处理多个流并触发规则。
长答案-我有一个有点类似的要求,我的答案是基于假设,你是阅读不同的Kafka主题不同的流。
从不同的主题中读取,这些主题在单个源中流式处理不同的事件:

FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
        new StringSerializerToEvent(),
        props);

kafkaSource.assignTimestampsAndWatermarks(new 
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
        .filter(Objects::nonNull);

序列化程序读取数据并将其解析为一个具有公共格式(例如)。

@Data
public class BAMEvent {
 private String keyid;  //If key based partitioning is needed
 private String eventName; // For different types of events
 private String eventId;  // Any other field you need
 private long timestamp; // For event time based processing 

 public String toString(){
   return eventName + " " + timestamp + " " + eventId + " " + correlationID;
 }

}

在这之后,事情就非常简单了,根据事件名称定义规则,并比较事件名称来定义规则(您还可以如下定义复杂规则):

Pattern.<BAMEvent>begin("first")
        .where(new SimpleCondition<BAMEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(BAMEvent event) throws Exception {
            return event.getEventName().equals("event1");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<BAMEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

            if (!secondEvent.getEventName().equals("event2")) {
              return false;
            }

            for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getEventId = firstEvent.getEventId()) {
                return true;
              }
            }
            return false;
          }
        })
        .within(withinTimeRule);

我希望这能给你一个想法,把一个或多个不同的流集成在一起。

zazmityj

zazmityj2#

我想知道是否可以进行严格的链接(而不是使用next),因为在给定的流中可能有许多特定时间戳的事件。假设时间t1-:a,b,c-,这三个事件来了,时间t2-:a2,b2,c2来了flink引擎。所以,我想知道我们如何得到事件(a),下一个(a2),因为它可能永远不会是这样的,因为序列应该是-:a2b2c2
然而,如果cep模块处理事件时将一个时间戳视为单个事件,那么这是有意义的。

相关问题