java—使用SpringKafka在kafka使用者中手动提交偏移量的方法

pw136qt2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(741)

我正在尝试使用SpringKafka(1.1.0.release)找出在kafka使用者中手动提交偏移量的方法。我理解,为了健壮的客户端实现,最好提交这些补偿,这样其他使用者就不会处理重复的事件,这些事件可能最初是由一个现已死亡的使用者处理的,或者是因为触发了重新平衡。
我知道有两种方法可以解决这个问题-
将ack\u mode设置为manual\u immediate,并在侦听器实现中调用ack.acknowledge()api

ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

@KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
    logger.info("===***batch size : " + batch.size() + "***===");
    batch.forEach(System.out::println);
    acknowledgment.acknowledge();
}

实现ConsumerBalanceListener。

ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(final Collection<TopicPartition> collection) {

}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> collection) {

}
});

但是,使用这种方法,我不知道如何获得对消费者的引用,以调用 consumer.commitSync() 或者 consumer.commitASync() API。由于技术上的限制,我不能使用支持 ConsumerAwareRebalanceListener 并参考了 Consumer .
那么如何利用呢 ConsumerRebalanceListener 能够向Kafka承诺补偿?
另外,我正在使用 ConcurrentKafkaListenerContainerFactory.setConcurrency() api,所以如果特定的使用者线程死亡,它是否有自己的ConsumerBalanceListener示例?

ecfdbz9o

ecfdbz9o1#

使用者重新平衡侦听器不能用于提交偏移量;与1.x;唯一的办法就是通过 Acknowledgment 参数。
1.1.0非常古老;建议所有1.x用户至少升级到1.3.5版本;由于kip-62,它有一个更简单的线程模型-参见项目页面。
目前的版本2.1.6(2.1.7将于今天发布)有更多的选择,包括 ConsumerAwareMessageListener 您可以完全访问消费者。
您不能从版本<1.3的侦听器直接与使用者交互,因为使用者不是线程安全的,必须在不同的线程上调用侦听器。

相关问题