我有一个kakfa使用者,其enable.auto.commit设置为false。每当我重新启动消费者应用程序时,它总是再次读取上一个提交的偏移量,然后读取下一个偏移量。
例如,上次提交的偏移量为50。当我重新启动consumer时,它再次首先读取偏移量50,然后读取下一个偏移量。
我正在执行commitsync,如下所示。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("sometopic", partition), new OffsetAndMetadata(offset));
kafkaconsumer.commitSync(offsets);
我试着将auto.offset.reset设置为earlish和latest但它并没有改变行为。
我在消费者配置中遗漏了什么吗?
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "CLIENT_ID");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP_ID");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,CustomDeserializer.class.getName());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
2条答案
按热度按时间oyt4ldly1#
看起来你想用
new OffsetAndMetadta(offset)
. 这不是典型的用法。以下是文档中的一个示例,在手动偏移控制下:
https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html
注意
consumer.commitSync()
调用在没有任何参数的情况下执行。它只是消耗,而且它会承诺到那个时候消耗的任何东西。b4lqfgs42#
如果你想用
commitSync(offset)
你必须仔细阅读它的javadoc:提交的偏移量应该是应用程序将使用的下一条消息,即lastprocessedmessageoffset+1。
如果不将+1添加到偏移量,则在下次重新启动时,使用者将再次使用最后一条消息。如另一个答案中所述,如果您使用
commitSync()
没有任何争论,你不用担心