我一直试图在TumblingEventTimeWindow上创建一个重复数据删除处理器,但似乎窗口从未关闭,因此没有调用process函数,因此没有触发事件。
我将flinkSource定义为
private DataStream<String> flinkStreamFromConfig(SourceTopic sourceTopic, StreamExecutionEnvironment flinkEnv){
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(sourceTopic.getServers())
.setTopics(sourceTopic.getTopicName())
.setGroupId(sourceTopic.getGroupId())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
return flinkEnv.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)), sourceTopic.getTopicName());
}
此Kafka源流正在正确创建,但当应用Tumbling Event Time窗口时,它从未输出任何结果
DataStream dedupeStream = flinkStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(25))).process(new DedupeProcessor());
这是一个永远不会执行的进程函数
public class DedupeProcessor extends ProcessAllWindowFunction<String, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(DelayedWatermarkStrategy.class);
@Override
public void process(ProcessAllWindowFunction<String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
HashSet<String> seenKeys = new HashSet<>();
StreamSupport.stream(elements.spliterator(), false).forEach(seenKeys::add);
logger.info("emitting event {} at {}", System.currentTimeMillis(), seenKeys);
seenKeys.forEach(out::collect);
}
}
我做错什么了吗?我试图调试可能是什么原因,我看到internalTimerService.currentWatermark()被设置为Long.Min并且没有改变。
我试着把事件时间窗口改成处理时间窗口,它能用那个工作,但不能用这个工作。
同样在调试器中,我可以看到窗口正在更改为新窗口,因为新事件被摄取,但如果flink窗口操作符的官方代码中的块从未命中,则触发器结果始终是继续并且从未触发。
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
此外,输入主题是单个分区。
1条答案
按热度按时间vzgqcmou1#
你的水印策略没有声明如何从传入的String值中提取时间戳,所以除非你有记录被推送到Kafka中(用于写入时间),否则不会分配水印。这反过来意味着没有事件时间窗口将被触发,因此您的进程函数将不会被调用。
如果无法从传入记录中获取事件时间,请尝试使用
TumblingProcessingTimeWindows.of(Time.seconds(25))
,而不是事件时间窗口。