Kafka,SpringKafka和重新传递旧信息

kx7yvsdv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(335)

我用Kafka和Spring Boot与SpringKafka。在异常终止应用程序然后重新启动之后,我的应用程序开始从kafka队列接收旧的、已经处理过的消息。
原因是什么?如何发现和解决问题?
我的Kafka财产:

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false

我的SpringKafka工厂和听众:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    Post post = consumerRecord.value();

    // do some logic here

    ack.acknowledge();
}
oxalkeyp

oxalkeyp1#

使用kafka时,客户机需要自己提交偏移量。这与其他消息代理(如amqp代理)不同,在amqp代理中,代理跟踪客户端已经接收到的消息。
在您的情况下,您不会自动提交偏移量,因此kafka希望您手动提交它们(由于此设置: spring.kafka.consumer.enable-auto-commit=false ). 如果不在程序中手动提交偏移量,那么所描述的行为与预期的行为基本相同。Kafka根本不知道程序成功处理了哪些消息。每次你重新启动你的程序,Kafka会看到你的程序还没有提交任何补偿,并将应用你在中提供的策略 spring.kafka.consumer.auto-offset-reset=earliest ,表示队列中的第一条消息。
如果这对您来说都是新的,我建议您阅读关于kafka的文档和这个spring文档,因为kafka与其他消息代理非常不同。

相关问题