我有一个用例,我希望消费者总是从最新的偏移量开始。我不需要为这个消费者承诺补偿。这在springkafka中是不可能实现的,因为新的使用者组总是提交新分配的分区。然后,在随后的程序启动时,使用者从存储的偏移量中读取,而不是从最新的偏移量中读取。换言之,只有从新的消费群体开始的第一个消费群体的行为才是正确的,即从最新的消费群体开始消费。问题出在 KafkaMessageListenerContainer$ListenerConsumer.onPartitionsAssigned()
为了便于参考,我在springboot中设置了以下内容
spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
1条答案
按热度按时间2mbi3lxu1#
添加该代码是为了解决一些恶劣的竞争条件,当一个新的消费群体开始消费时发生了重新划分;它可能导致记录丢失或重复,具体取决于配置。
为了避免这些情况,最好提交初始偏移量。
不过,我同意,如果用户完全负责偏移量(使用手动确认模式),那么我们可能不应该这样做;这取决于用户代码来处理比赛(在您的情况下,您不关心丢失的记录)。
请随意打开github问题(欢迎投稿)。
同时,您可以通过让侦听器实现来避免这种情况
ConsumerSeekAware
并在作业过程中寻找主题/分区结尾。另一种选择是每次对group.id使用一个uuid;你总是从主题的结尾开始。