Spring Boot :Kafka提交偏移量

um6iljoc  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(119)

我有一个非常简单的应用程序:

@KafkaListener(topics = "journal-topic")
    public void onMessage(ConsumerRecord<String,String> consumerRecord, Acknowledgment acknowledgment) {
        acknowledgment.acknowledge();
        var message = extractMessage(consumerRecord);
        messageService.saveOrUpdateMessage(message);
    }

extractMessage(consumerRecord)抛出一个ExceptionIllegalArgumentException),但在我的单元测试中,有6次重试。我不明白为什么。我一进入这个方法,在抛出异常之前,我就acknowledge()了。所以应该被认为是好的,我认为不应该发生重试。
下面是我的配置:

kafka:
    topic: "journal-topic"
    properties:
      auto-create-topics-enable: true

    consumer:
      bootstrap-servers: localhost:9092
      group-id: "journal-group"
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest
      max-poll-records: 10
      enable-auto-commit: false
    listener:
        ack-mode: manual_immediate
        concurrency: 3
bxjv4tth

bxjv4tth1#

无论您是否确认记录,如果您抛出异常,默认的错误处理程序将导致它被重播。
如果你想放弃它,你需要在监听器中捕获异常,而不是将其抛出到容器中。
或使用自定义错误处理程序。

相关问题