Flink TumblingEventTimeWindow未生成任何输出

q8l4jmvw  于 2023-06-27  发布在  Apache
关注(0)|答案(1)|浏览(341)

我一直试图在TumblingEventTimeWindow上创建一个重复数据删除处理器,但似乎窗口从未关闭,因此没有调用process函数,因此没有触发事件。
我将flinkSource定义为

private DataStream<String> flinkStreamFromConfig(SourceTopic sourceTopic, StreamExecutionEnvironment flinkEnv){
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(sourceTopic.getServers())
                .setTopics(sourceTopic.getTopicName())
                .setGroupId(sourceTopic.getGroupId())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        return flinkEnv.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)), sourceTopic.getTopicName());
}

此Kafka源流正在正确创建,但当应用Tumbling Event Time窗口时,它从未输出任何结果
DataStream dedupeStream = flinkStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(25))).process(new DedupeProcessor());
这是一个永远不会执行的进程函数

public class DedupeProcessor extends ProcessAllWindowFunction<String, String, TimeWindow> {
    private static final Logger logger = LoggerFactory.getLogger(DelayedWatermarkStrategy.class);
    @Override
    public void process(ProcessAllWindowFunction<String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
        HashSet<String> seenKeys = new HashSet<>();
        StreamSupport.stream(elements.spliterator(), false).forEach(seenKeys::add);
        logger.info("emitting event {} at {}", System.currentTimeMillis(), seenKeys);
        seenKeys.forEach(out::collect);
    }
}

我做错什么了吗?我试图调试可能是什么原因,我看到internalTimerService.currentWatermark()被设置为Long.Min并且没有改变。
我试着把事件时间窗口改成处理时间窗口,它能用那个工作,但不能用这个工作。
同样在调试器中,我可以看到窗口正在更改为新窗口,因为新事件被摄取,但如果flink窗口操作符的官方代码中的块从未命中,则触发器结果始终是继续并且从未触发。

if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

此外,输入主题是单个分区。

vzgqcmou

vzgqcmou1#

你的水印策略没有声明如何从传入的String值中提取时间戳,所以除非你有记录被推送到Kafka中(用于写入时间),否则不会分配水印。这反过来意味着没有事件时间窗口将被触发,因此您的进程函数将不会被调用。
如果无法从传入记录中获取事件时间,请尝试使用TumblingProcessingTimeWindows.of(Time.seconds(25)),而不是事件时间窗口。

相关问题