我有一个非常简单的应用程序:
@KafkaListener(topics = "journal-topic")
public void onMessage(ConsumerRecord<String,String> consumerRecord, Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
var message = extractMessage(consumerRecord);
messageService.saveOrUpdateMessage(message);
}
extractMessage(consumerRecord)
抛出一个Exception
(IllegalArgumentException
),但在我的单元测试中,有6次重试。我不明白为什么。我一进入这个方法,在抛出异常之前,我就acknowledge()
了。所以应该被认为是好的,我认为不应该发生重试。
下面是我的配置:
kafka:
topic: "journal-topic"
properties:
auto-create-topics-enable: true
consumer:
bootstrap-servers: localhost:9092
group-id: "journal-group"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
max-poll-records: 10
enable-auto-commit: false
listener:
ack-mode: manual_immediate
concurrency: 3
1条答案
按热度按时间bxjv4tth1#
无论您是否确认记录,如果您抛出异常,默认的错误处理程序将导致它被重播。
如果你想放弃它,你需要在监听器中捕获异常,而不是将其抛出到容器中。
或使用自定义错误处理程序。