spring boot手动确认kafka消息不起作用

0md85ypi  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(711)

我有一个SpringBootKafka消费者,它使用一个主题中的数据并将其存储在数据库中,一旦存储就确认它。它工作正常,但如果应用程序在使用记录后未能获得db连接,则会出现问题,在这种情况下,我们不会发送确认,但在更改组id并重新启动使用程序之前,或除非更改组id并重新启动使用器,否则消息仍然不会被使用
我的消费者如下所示

@KafkaListener(id = "${group.id}", topics = {"${kafka.edi.topic}"})
  public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
      boolean shouldAcknowledge = false;
      try {
          String tNo = getTrackingNumber((String) record.key());

          log.info("Check Duplicate By Comparing With DB records");

          if (!ediRecordService.isDuplicate(tNo)) {---this checks the record in my DB
              shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
          } else {
              log.warn("Duplicate record found.");
              shouldAcknowledge = true;
          }
          if (shouldAcknowledge) {
              acknowledgment.acknowledge();
          }```

So if you see the above snippet we did not sent acknowledgment.
fbcarpbf

fbcarpbf1#

这不是Kafka在这里的工作原理
分区中的每个记录都被分配了一个称为偏移量的顺序id号,该偏移量唯一地标识分区中的每个记录。
例如,从上面的语句中,从第一个poll consumer获取偏移量处的消息 300 如果由于某些问题而无法持久化到数据库中,则不会提交偏移量。
因此,在下一次投票中,它将得到偏移量为的下一个记录 301 如果它成功地将数据持久化到数据库中,那么它将提交偏移量 301 (这意味着该分区中的所有记录都将被处理到该偏移量,在上例中为301)
解决方法:使用重试机制,直到它通过有限的重试成功地将数据存储到数据库中,或者只将失败的数据保存到数据库中 error topic 然后重新处理它,或者将失败记录的偏移量保存在某个位置,以便以后可以重新处理它们。

相关问题