flink的会话窗口缺少延迟输出

l7mqbcuq  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(459)

在我的管道设置中,我看不到会话窗口的侧输出。我用的是flink 1.9.1
版本1。我得到的是:

messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

latetrademessages实现sessionwindowtimegapextractor并返回5秒。
此外,我还有:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For Aggregation");
                   out.collect(new Transaction());
              }
       })
   .name("Process Late messages For Aggregation");

问题是,当我用同一个键发送消息时,我从来没有看到“为聚合处理延迟的消息”,这会错过窗口时间。
当会话窗口通过时,我“立即”为同一个键发送了一条新消息,它会触发新的会话窗口,而不会进入latesideoutput。
不知道我做错了什么。
我想在这里实现的,是抓住“迟来的事件”,并尝试重新处理它们。
我将感谢任何帮助。
版本2,在@dominik wosi之后ń滑雪评论:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false), properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");

水印在进步,我查了flink的指标。窗口操作符正在执行,但仍然没有延迟输出。
顺便说一句,Kafka的主题可以是空闲的,所以我必须定期发出新的水印。

czq61nw1

czq61nw11#

水印方法在我看来非常可疑。通常,此时会输出最新的事件时间戳。
只是一些背景资料,这样更容易理解。
延迟事件指的是在水印处理到事件之后的某个时间之后发生的事件。考虑以下示例:

event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4

您的水印方法将几乎把所有过去的事件都呈现为后期事件(由于1s水印间隔,所以有一点容忍度)。这也会使再加工和回收变得不可能。
然而,你实际上没有看到任何更让我吃惊的事情。你能仔细检查你的水印方法,描述你的用例,并提供示例数据吗?通常情况下,实现对于实际用例并不理想,应该以不同的方式来解决。

qf9go6mv

qf9go6mv2#

您正在使用 ProcessingTime 在您的例子中,这意味着系统时间用于测量系统中的时间流 DataStream .
对于每个事件,分配给该事件的时间戳是您在flink管道中接收数据的时刻。这意味着在flink处理时间内没有办法使事件无序。正因为如此,你的窗口永远不会有最新的元素。
如果你换成 EventTime ,则对于正确的输入数据,您应该能够看到传递到输出端的后期元素。
您可能应该看看文档,其中有flink解释的各种时间概念。

相关问题