正确查找tobeginning()组中的所有使用者

inkz8wg9  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(462)

我的目标是在每个分区的开始处寻找一个消费者组中的所有消费者。我使用SpringKafka,并试图使用 ConsumerSeekAware 接口方式:

override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
  logger.info { "on partitions assigned" }
  assignments?.forEach { topic, _ -> callback?.seekToBeginning(topic.topic(), topic.partition()) }
}

现在,对于一个组中只有一个消费者来说,它是有效的,但是如果我想开始多个消费者呢?我试过了,结果是无限的重新平衡分区。我误解了什么?

mwg9r5ms

mwg9r5ms1#

我最终还是用了其他的方法 ConsumerSeekAware ,即 registerSeekCallback() . 打电话后 onPartitionsAssigned 我将当前分区保存为侦听器中的成员。打电话后也一样 registerSeekCallback 我将回调保存在示例中。
然后我可以按需打电话 consumeFromBeginning 对于每个消费者(通过休息或启动后):

class EventConsumer : ConsumerSeekAware {

    private lateinit var assignedTopicPartitions: List<TopicPartition>
    private lateinit var consumerSeekCallback: ConsumerSeekAware.ConsumerSeekCallback

    fun consumeFromBeginning() {
        assignedTopicPartitions.forEach { topicPartition ->
            consumerSeekCallback.seekToBeginning(topicPartition.topic(), topicPartition.partition())}
    }

    override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
        logger.debug { "on partitions assigned" }
        this.assignedTopicPartitions = assignments?.map { it.key } ?: error("no assignments found")
    }

    override fun registerSeekCallback(callback: ConsumerSeekAware.ConsumerSeekCallback?) {
        logger.debug { "registerSeekCallback" }
        this.consumerSeekCallback = callback ?: error("seek callback not found")
    }

    override fun onIdleContainer(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {}
}

相关问题