SpringKafka听所有的主题和调整分区偏移

nbnkbykc  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(379)

基于springkafka的文档,我使用基于注解的@kafkalistener来配置我的消费者。
我看到的是-
除非我将偏移量指定为零,否则kafka consumer会在开始时拾取未来的消息,而不是现有的消息(我理解这是一个预期的结果,因为我没有指定我想要的偏移量)
我在文档中看到了一个选项,可以指定一个主题+分区的组合,以及一个偏移量为零的组合,但是如果我这样做了,我必须显式地指定我希望我的消费者听哪个主题。
使用上面的方法2,这就是我的消费者现在的样子-

@KafkaListener(id = "{group.id}",
        topicPartitions = {
                @TopicPartition(topic = "${kafka.topic.name}",
                        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
        },
        containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
                   Acknowledgment ack) throws InterruptedException, IOException {

    logger.debug("This is what we received in the Kafka Consumer = " + payload);

    idService.process(payload);

    ack.acknowledge();
}

虽然我知道有一个选项可以指定“topicpattern”通配符或“topics”列表作为注解配置的一部分,但我没有看到一个地方可以为列出的主题/主题模式提供从零开始的偏移值。有没有办法把两者结合起来呢?请告知。

v64noz0r

v64noz0r1#

当使用主题和主题模式(而不是显式声明分区)时,kafka决定哪个使用者示例将获得哪个分区。
kafka将分配分区,并且初始偏移量将是该组id的最后一次提交。您当前无法更改该偏移量,但我们正在考虑添加一个seek函数。
如果总是希望从第一个可用的偏移量开始,请使用唯一的组id(例如。 UUID.randomUUID().toString() )并设置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

由于kafka没有该组id的现有偏移量,因此它将使用该属性来确定从何处开始。
您也可以使用手动确认模式和从不确认,这将有效地做同样的事情。

相关问题