Kafka保留策略无法按预期工作

n7taea2i  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(316)

我想为我们有的一些用例实现一个数据重播,为此,我需要使用kafka保留策略(我使用join,我需要准确的窗口时间)。p、 我使用的是Kafka版本0.10.1.1
我将数据发送到如下主题:

kafkaProducer.send(
                    new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
            );

我的主题是这样的:
Kafka主题--创建--Zookeeperlocalhost:2181 --replication-factor 1—分区1—主题mytopic
kafka topics--zookeeper localhost--alter--topic mytopic--config retention.ms=17280000 kafka topics--zookeeper localhost--alter--topic mytopic--config segment.ms=17280000
因此,通过上述设置,我应该将主题的保留时间设置为48小时。
我延伸 TimestampExtractor 以便记录每条消息的实际时间。

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord) {
        LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
        return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
    }
}

为了测试,我向我的主题发送了4条消息,我得到了这4条日志消息。
2017-02-28 10:23:39信息消费者记录墙时钟时间戳extractor:21 - 时间戳:1488295086292人类可读-周二2月28日10:18:06 est 2017
2017-02-28 10:24:01信息消费者记录墙时钟时间戳extractor:21 - 时间戳:1483272000000人类可读-2017年1月1日周日美国东部时间07:00:00
2017-02-28 10:26:11信息消费者记录墙时钟时间戳extractor:21 - 时间戳:1485820800000人可读-周一2017年1月30日19:00:00东部时间
2017-02-28 10:27:22信息消费者记录墙时钟时间戳extractor:21 - 时间戳:1488295604411人类可读-周二2月28日10:26:44 est 2017
因此,根据Kafka的保留政策,我希望看到我的两封邮件在5分钟后被清除/删除(第二封和第三封邮件,因为它们是1月1日和1月30日发出的)。但我试着花了一个小时来讨论我的主题,每次我讨论完我的主题,我就收到了所有的4条信息。
Kafkaavro控制台消费者——zookeeperlocalhost:2181 --from-beginning --主题mytopic
我的Kafka配置如下:


############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

# log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=300000

我是做错了什么还是错过了什么?

j8yoct9x

j8yoct9x1#

kafka通过删除日志段来实现其保留策略。kafka从不删除活动段,即它将向分区附加新消息的段。Kafka只删除旧片段。当一条新消息被发送到分区时,kafka将活动段滚动到一个旧段,并且
包含新消息的活动段的大小将超过 log.segment.bytes ,或
活动段中第一条消息的时间戳早于 log.roll.ms (默认为7天)
因此,在您的示例中,您必须在2017年2月28日星期二10:18:06 est之后等待7天,发送一条新消息,然后所有4条初始消息都将被删除。

相关问题