ApacheFlink不返回空闲分区的数据

pw9qyyiw  于 2021-06-29  发布在  Java
关注(0)|答案(2)|浏览(436)

我试图根据事件时间计算Kafka主题每分钟传入事件的速率。我使用1分钟的TumblingEventTimeWindow。下面给出了代码片段。
我注意到,如果我没有收到某个特定窗口的任何事件,例如从2.34到2.35,则之前的2.33到2.34的窗口不会关闭。我知道在2.33到2.34的窗口中丢失数据的风险(可能是由于系统故障、较大的Kafka滞后等原因),但我不能无限期地等待。我需要在等待一段时间后关闭此窗口,系统恢复后,后续窗口可以继续。我怎样才能做到这一点?

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    kafkaDataStream
            .flatMap(new EventFlatter())
            .filter(Objects::nonNull)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Entity>forMonotonousTimestamps()
                    .withIdleness(Duration.ofSeconds(60))
                    .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .assignTimestampsAndWatermarks(new EntityWatermarkStrategy())
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
            .aggregate(new EventCountAggregator())
            .addSink(eventRateProducer);

private static class EntityWatermarkStrategy implements WatermarkStrategy<Entity> {
    @Override
    public WatermarkGenerator<Entity> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

        return new EntityWatermarkGenerator();
    }

}

private static class EntityWatermarkGenerator implements WatermarkGenerator<Entity> {

    private long maxTimestamp;

    public EntityWatermarkGenerator() {
        this.maxTimestamp = Long.MIN_VALUE + 1;
    }

    @Override
    public void onEvent(Entity event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp + 2));
    }
}

另外,我尝试添加一些自定义触发器,但没有帮助。我使用的是ApacheFlink1.11
有人能告诉我,我做错什么了?
当我尝试用一个主题的较新时间戳(比如t+1)推送更多的数据时,来自较早时间段(t)的数据被推送。但是对于t+1数据,同样的问题也会出现在t。

ojsjcaue

ojsjcaue1#

一个原因是 withIdleness() 对你的案子没有帮助的是你打电话来 assignTimestampsAndWatermarks 在数据流上,而不是在 FlinkKafkaConsumer 它自己。如果你要做后者,那么 FlinkKafkaConsumer 能够在每个分区的基础上分配时间戳和水印,并在每个kafka分区的粒度上考虑空闲。更多信息请参见水印策略和Kafka连接器。
但是,要使其工作,您需要使用反序列化程序而不是 SimpleStringSchema (例如 KafkaDeserializationSchema )它能够创建带有时间戳的单个流记录。看到了吗https://stackoverflow.com/a/62072265/2000823 例如,如何实现 KafkaDeserializationSchema .
但是,请记住 withIdleness() 如果所有分区都空闲,则不会推进水印。它将做的是防止空闲分区保留水印,如果有来自其他分区的事件,水印可能会提前。

e3bfsja2

e3bfsja22#

有关解决问题的方法,请参阅idle partitions文档。

相关问题