Kafka 使用seek侦听仅获取未提交的距开头的偏移量

umuewwlo  于 2022-12-03  发布在  Apache
关注(0)|答案(1)|浏览(302)

我正在使用Spring Kafka,并且有一个要求,我必须从一个DLQ主题监听消息,并在几分钟后将消息放到另一个主题。这里我只在消息被放到另一个主题时才确认消息,否则我不会提交它并调用kafkaListenerEndpointRegistry。stop(),这将停止我的Kafka使用者。然后,每3分钟运行一次计划的cron作业,并通过运行kafkaListenerEndpointRegistry启动使用者。start(),且由于auto.offset.reset被设置为最早,因此消费者从先前未提交偏移量获得所有消息,并检查它们是否有资格被放在其他主题上。
这种方法对于小容量的磁盘运行良好,但是对于非常大的磁盘,我在两个主题中都没有看到预期的重试。所以我怀疑这可能发生了,因为我使用了kafkaListenerEndpointRegistry.stop()来停止使用者。如果我能够查找到每个分区的偏移量的开始,并从未提交的偏移量中获得所有消息,那么我就不必停止并启动我的使用者。
为此,我尝试了ConsumerSeekAware.onPartitionAssigned并调用callback.seekToBeginning()来重置偏移量。但看起来它也消耗了所有提交的偏移量,这增加了我的服务上的巨大负载。那么,我是否遗漏了什么,或者seekToBeginning总是读取所有消息(提交的和未提交的)。
是否有办法在运行Kafka consumer时手动触发分区分配,以便它转到onPartitionAssigned方法?

0g0grzrc

0g0grzrc1#

auto.offset.reset设置为最早,则使用者将从以前未提交的消息中获取所有消息
如果存在提交的偏移量,则auto.offset.reset是无意义的;它仅确定在没有提交的偏移量的情况下的行为。
seekToBeginning始终读取所有消息(已提交和未提交)。
Kafka保持2个指针-当前位置和承诺的偏移量; seek与提交的偏移量无关,seekToBeginning只是将位置更改为最早的记录,因此下一次轮询将返回所有记录。
这种方法对于小容量的情况很好,但是对于非常大的容量,我在两个主题中都没有看到预期的重试。所以我怀疑这可能是因为我使用kafkaListenerEndpointRegistry.stop()来阻止消费者。
这应该不是问题;您可能需要考虑使用容器停止错误处理程序;然后抛出一个异常,容器将自行停止(您还应该设置stopImmediate容器属性)。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#container-stopping-error-handlers

相关问题