Flink CEP不适用于inEventTime(),但在应用于模式时适用于inProcessingTime()

laawzig2  于 2023-02-17  发布在  Apache
关注(0)|答案(1)|浏览(171)

我正在处理以下程序,并设置了WatermarkStrategy,但是当我使用inEventTime()方法运行该程序时,它不会给予任何输出。
注意:当我在pattern上使用inProcessingTime()时,同样的程序也能工作。

public class FlinkCEPTest {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        
        final String bootstrapServers = parameter.get("kafka.broker", "localhost:9092,broker:29092");
        final String inputTopic_1 = parameter.get("input.topic.1","acctopic");
        final String inputTopic_2 = parameter.get("input.topic.2","txntopic");
        final String outputTopic = parameter.get("output.topic.q","alerttopic");
        final String groupID = parameter.get("group.id","flink-demo-grp-id");

        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        
        KafkaSource<EventMessage> source_1 = KafkaSource.<EventMessage>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(inputTopic_1).setGroupId(groupID)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(new EventSchema())
                .build();    

        DataStream<EventMessage> text_1 = env.fromSource(source_1,
                WatermarkStrategy
                .<EventMessage>forBoundedOutOfOrderness(Duration.ofSeconds(300))
                .withTimestampAssigner((event, trtimestamp)-> {
                    //System.err.println("Kafka ingetstion ts : " + trtimestamp);
                    //System.err.println("Event ts : "+ event.getTxnDate().getTime());
                    return event.getTxnDate().getTime();}) 
                , "Kafka Source 1");

        DataStream<EventMessage> partitionedInput = text_1.keyBy(evt -> evt.getAccountId());

        //partitionedInput.print();

        Pattern<EventMessage, ?> relaxedAlarmPattern = Pattern.<EventMessage>begin("first").subtype(EventMessage.class)
                .where(new SimpleCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value) throws Exception {
                        return value.getEvent().equalsIgnoreCase("PASSWORD_CHANGE_SUCC");
                    }
                }).followedBy("second").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
                        Iterable<EventMessage> test = ctx.getEventsForPattern("first");
                        Integer accid = 0;
                        for (EventMessage te : test) {
                            accid = te.getAccountId();
                        }
                        return value.getEvent().equalsIgnoreCase("BENIFICIARY_ADDED")
                                && value.getAccountId().equals(accid);
                    }
                }).followedBy("third").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
                        Integer accid = 0;
                        Iterable<EventMessage> test = ctx.getEventsForPattern("first");
                        for (EventMessage te : test) {
                            accid = te.getAccountId();
                        }
                        return value.getEvent().equalsIgnoreCase("TXN_NEW")
                                && value.getAccountId().equals(accid) && value.getAmt() <= 10;
                    }
                }).followedBy("last").subtype(EventMessage.class).where(new IterativeCondition<EventMessage>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public boolean filter(EventMessage value, Context<EventMessage> ctx) throws Exception {
                        Integer accid = 0;
                        Iterable<EventMessage> test = ctx.getEventsForPattern("first");
                        for (EventMessage te : test) {
                            accid = te.getAccountId();
                        }
                        return value.getEvent().equalsIgnoreCase("TXN_NEW")
                                && value.getAccountId().equals(accid) && value.getAmt() >= 100 ;
                    }
                }).within(Time.seconds(300));
        
        

        PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern)
                .inEventTime();
                //.inProcessingTime();

        DataStream<String> alarms = patternStream.select(new PatternSelectFunction<EventMessage, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String select(Map<String, List<EventMessage>> pattern) throws Exception {
                EventMessage first = (EventMessage) pattern.get("first").get(0);
                EventMessage middle = (EventMessage) pattern.get("second").get(0);
                EventMessage third = (EventMessage) pattern.get("third").get(0);
                EventMessage last = (EventMessage) pattern.get("last").get(0);
                return "WARNING : Possible fraud scenario [ Party ID " + first.getPartyId()
                        + " recently changed his password and added a beneficiary and later made transcations of "
                        + third.getAmt() + " and " + last.getAmt()+" ]";
            }

        });

        alarms.print();
    
        env.execute(" CEP ");
    }

}

如果我把下面这行

PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern).inEventTime();

PatternStream<EventMessage> patternStream = CEP.pattern(partitionedInput, relaxedAlarmPattern).inProcessingTime();

代码工作,任何建议,我如何才能使它与inEventTime()方法.

11dmarpk

11dmarpk1#

通常Kafka源代码的问题是并行度高于分区数,或者不是所有分区都接收数据,这使得水印无法向前推进。您可以通过调整并行度或将withIdleness与水印策略一起使用来解决此问题。
请参阅Kafka连接器文档中的更多信息。

相关问题