我正在使用Spring Kafka,并且有一个要求,我必须从一个DLQ主题监听消息,并在几分钟后将消息放到另一个主题。这里我只在消息被放到另一个主题时才确认消息,否则我不会提交它并调用kafkaListenerEndpointRegistry。stop(),这将停止我的Kafka使用者。然后,每3分钟运行一次计划的cron作业,并通过运行kafkaListenerEndpointRegistry启动使用者。start(),且由于auto.offset.reset被设置为最早,因此消费者从先前未提交偏移量获得所有消息,并检查它们是否有资格被放在其他主题上。
这种方法对于小容量的磁盘运行良好,但是对于非常大的磁盘,我在两个主题中都没有看到预期的重试。所以我怀疑这可能发生了,因为我使用了kafkaListenerEndpointRegistry.stop()来停止使用者。如果我能够查找到每个分区的偏移量的开始,并从未提交的偏移量中获得所有消息,那么我就不必停止并启动我的使用者。
为此,我尝试了ConsumerSeekAware.onPartitionAssigned并调用callback.seekToBeginning()来重置偏移量。但看起来它也消耗了所有提交的偏移量,这增加了我的服务上的巨大负载。那么,我是否遗漏了什么,或者seekToBeginning总是读取所有消息(提交的和未提交的)。
是否有办法在运行Kafka consumer时手动触发分区分配,以便它转到onPartitionAssigned方法?
1条答案
按热度按时间0g0grzrc1#
auto.offset.reset
设置为最早,则使用者将从以前未提交的消息中获取所有消息如果存在提交的偏移量,则
auto.offset.reset
是无意义的;它仅确定在没有提交的偏移量的情况下的行为。seekToBeginning始终读取所有消息(已提交和未提交)。
Kafka保持2个指针-当前位置和承诺的偏移量; seek与提交的偏移量无关,
seekToBeginning
只是将位置更改为最早的记录,因此下一次轮询将返回所有记录。这种方法对于小容量的情况很好,但是对于非常大的容量,我在两个主题中都没有看到预期的重试。所以我怀疑这可能是因为我使用
kafkaListenerEndpointRegistry.stop()
来阻止消费者。这应该不是问题;您可能需要考虑使用容器停止错误处理程序;然后抛出一个异常,容器将自行停止(您还应该设置
stopImmediate
容器属性)。https://docs.spring.io/spring-kafka/docs/current/reference/html/#container-stopping-error-handlers