我试图根据事件时间计算Kafka主题每分钟传入事件的速率。我使用1分钟的TumblingEventTimeWindow。下面给出了代码片段。
我注意到,如果我没有收到某个特定窗口的任何事件,例如从2.34到2.35,则之前的2.33到2.34的窗口不会关闭。我知道在2.33到2.34的窗口中丢失数据的风险(可能是由于系统故障、较大的Kafka滞后等原因),但我不能无限期地等待。我需要在等待一段时间后关闭此窗口,系统恢复后,后续窗口可以继续。我怎样才能做到这一点?
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "AllEventCountConsumerGroup");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Entity>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(60))
.withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
.assignTimestampsAndWatermarks(new EntityWatermarkStrategy())
.keyBy((KeySelector<Entity, String>) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.aggregate(new EventCountAggregator())
.addSink(eventRateProducer);
private static class EntityWatermarkStrategy implements WatermarkStrategy<Entity> {
@Override
public WatermarkGenerator<Entity> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new EntityWatermarkGenerator();
}
}
private static class EntityWatermarkGenerator implements WatermarkGenerator<Entity> {
private long maxTimestamp;
public EntityWatermarkGenerator() {
this.maxTimestamp = Long.MIN_VALUE + 1;
}
@Override
public void onEvent(Entity event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp + 2));
}
}
另外,我尝试添加一些自定义触发器,但没有帮助。我使用的是ApacheFlink1.11
有人能告诉我,我做错什么了?
当我尝试用一个主题的较新时间戳(比如t+1)推送更多的数据时,来自较早时间段(t)的数据被推送。但是对于t+1数据,同样的问题也会出现在t。
2条答案
按热度按时间ojsjcaue1#
一个原因是
withIdleness()
对你的案子没有帮助的是你打电话来assignTimestampsAndWatermarks
在数据流上,而不是在FlinkKafkaConsumer
它自己。如果你要做后者,那么FlinkKafkaConsumer
能够在每个分区的基础上分配时间戳和水印,并在每个kafka分区的粒度上考虑空闲。更多信息请参见水印策略和Kafka连接器。但是,要使其工作,您需要使用反序列化程序而不是
SimpleStringSchema
(例如KafkaDeserializationSchema
)它能够创建带有时间戳的单个流记录。看到了吗https://stackoverflow.com/a/62072265/2000823 例如,如何实现KafkaDeserializationSchema
.但是,请记住
withIdleness()
如果所有分区都空闲,则不会推进水印。它将做的是防止空闲分区保留水印,如果有来自其他分区的事件,水印可能会提前。e3bfsja22#
有关解决问题的方法,请参阅idle partitions文档。