我的目标是在每个分区的开始处寻找一个消费者组中的所有消费者。我使用SpringKafka,并试图使用 ConsumerSeekAware
接口方式:
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.info { "on partitions assigned" }
assignments?.forEach { topic, _ -> callback?.seekToBeginning(topic.topic(), topic.partition()) }
}
现在,对于一个组中只有一个消费者来说,它是有效的,但是如果我想开始多个消费者呢?我试过了,结果是无限的重新平衡分区。我误解了什么?
1条答案
按热度按时间mwg9r5ms1#
我最终还是用了其他的方法
ConsumerSeekAware
,即registerSeekCallback()
. 打电话后onPartitionsAssigned
我将当前分区保存为侦听器中的成员。打电话后也一样registerSeekCallback
我将回调保存在示例中。然后我可以按需打电话
consumeFromBeginning
对于每个消费者(通过休息或启动后):