kafka使用者正在重新启动时读取最后提交的偏移量(java)

ezykj2lf  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(332)

我有一个kakfa使用者,其enable.auto.commit设置为false。每当我重新启动消费者应用程序时,它总是再次读取上一个提交的偏移量,然后读取下一个偏移量。
例如,上次提交的偏移量为50。当我重新启动consumer时,它再次首先读取偏移量50,然后读取下一个偏移量。
我正在执行commitsync,如下所示。

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);

我试着将auto.offset.reset设置为earlishlatest但它并没有改变行为。
我在消费者配置中遗漏了什么吗?

config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
oyt4ldly

oyt4ldly1#

看起来你想用 new OffsetAndMetadta(offset) . 这不是典型的用法。
以下是文档中的一个示例,在手动偏移控制下:

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html
注意 consumer.commitSync() 调用在没有任何参数的情况下执行。它只是消耗,而且它会承诺到那个时候消耗的任何东西。

b4lqfgs4

b4lqfgs42#

如果你想用 commitSync(offset) 你必须仔细阅读它的javadoc:
提交的偏移量应该是应用程序将使用的下一条消息,即lastprocessedmessageoffset+1。
如果不将+1添加到偏移量,则在下次重新启动时,使用者将再次使用最后一条消息。如另一个答案中所述,如果您使用 commitSync() 没有任何争论,你不用担心

相关问题