我有一个SpringBootKafka消费者,它使用一个主题中的数据并将其存储在数据库中,一旦存储就确认它。它工作正常,但如果应用程序在使用记录后未能获得db连接,则会出现问题,在这种情况下,我们不会发送确认,但在更改组id并重新启动使用程序之前,或除非更改组id并重新启动使用器,否则消息仍然不会被使用
我的消费者如下所示
@KafkaListener(id = "${group.id}", topics = {"${kafka.edi.topic}"})
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());
log.info("Check Duplicate By Comparing With DB records");
if (!ediRecordService.isDuplicate(tNo)) {---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
} else {
log.warn("Duplicate record found.");
shouldAcknowledge = true;
}
if (shouldAcknowledge) {
acknowledgment.acknowledge();
}```
So if you see the above snippet we did not sent acknowledgment.
1条答案
按热度按时间fbcarpbf1#
这不是Kafka在这里的工作原理
分区中的每个记录都被分配了一个称为偏移量的顺序id号,该偏移量唯一地标识分区中的每个记录。
例如,从上面的语句中,从第一个poll consumer获取偏移量处的消息
300
如果由于某些问题而无法持久化到数据库中,则不会提交偏移量。因此,在下一次投票中,它将得到偏移量为的下一个记录
301
如果它成功地将数据持久化到数据库中,那么它将提交偏移量301
(这意味着该分区中的所有记录都将被处理到该偏移量,在上例中为301)解决方法:使用重试机制,直到它通过有限的重试成功地将数据存储到数据库中,或者只将失败的数据保存到数据库中
error topic
然后重新处理它,或者将失败记录的偏移量保存在某个位置,以便以后可以重新处理它们。