我正在尝试使用SpringKafka(1.1.0.release)找出在kafka使用者中手动提交偏移量的方法。我理解,为了健壮的客户端实现,最好提交这些补偿,这样其他使用者就不会处理重复的事件,这些事件可能最初是由一个现已死亡的使用者处理的,或者是因为触发了重新平衡。
我知道有两种方法可以解决这个问题-
将ack\u mode设置为manual\u immediate,并在侦听器实现中调用ack.acknowledge()api
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
@KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
logger.info("===***batch size : " + batch.size() + "***===");
batch.forEach(System.out::println);
acknowledgment.acknowledge();
}
实现ConsumerBalanceListener。
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
}
});
但是,使用这种方法,我不知道如何获得对消费者的引用,以调用 consumer.commitSync()
或者 consumer.commitASync()
API。由于技术上的限制,我不能使用支持 ConsumerAwareRebalanceListener
并参考了 Consumer
.
那么如何利用呢 ConsumerRebalanceListener
能够向Kafka承诺补偿?
另外,我正在使用 ConcurrentKafkaListenerContainerFactory.setConcurrency()
api,所以如果特定的使用者线程死亡,它是否有自己的ConsumerBalanceListener示例?
1条答案
按热度按时间ecfdbz9o1#
使用者重新平衡侦听器不能用于提交偏移量;与1.x;唯一的办法就是通过
Acknowledgment
参数。1.1.0非常古老;建议所有1.x用户至少升级到1.3.5版本;由于kip-62,它有一个更简单的线程模型-参见项目页面。
目前的版本2.1.6(2.1.7将于今天发布)有更多的选择,包括
ConsumerAwareMessageListener
您可以完全访问消费者。您不能从版本<1.3的侦听器直接与使用者交互,因为使用者不是线程安全的,必须在不同的线程上调用侦听器。