我正在使用Kafka监听消费者的消息。我已启用手动确认。我使用了acknowledgement.acknowledge()方法来提交偏移量,以防成功。如果由于某个api调用失败而出现异常,我还没有提交它。
我有点困惑,因为除非重新启动应用程序,否则未提交的消息不会被再次使用。我是Kafka的新成员,所以我确信我错过了一些非常基本的东西,但是寻找这些东西是徒劳的。有人能给我解释一下,或者给我指出正确的来源,让我知道这里出了什么问题吗?
@KafkaListener(topics = "temp_topic", groupId = "temp-consumer", containerFactory = "tempListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord,
Acknowledgment acknowledgment) {
try {
log.info("Consuming message : {} ", consumerRecord.value().toString());
String message = consumerRecord.value().toString();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Payload payload = objectMapper.readValue(message,Payload.class);
tempService.processEvent(payload);
log.info("Message: {} successfully consumed",message);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Error while listening to kafka event: ", e);
}
}
先谢谢你。
2条答案
按热度按时间zf2sa74q1#
确认不会改变消费者实际阅读的时间点。
它只将偏移提交到某些存储(zookeeper或带有偏移的kafka主题),而不对使用者跟踪的偏移进行任何更改—例如,您可能不会提交当前偏移并通过在下一个偏移上进行确认来覆盖它。
有一些方法可以尝试处理失败的消息:
您可以为特定类型的失败做好准备(例如超时、反序列化错误)-在侦听器中捕获它们并采取正确的操作(例如等待/重试或使应用程序失败-取决于问题的性质)。
您可以跳过失败的消息—捕获侦听器中的错误,将它们放入失败消息的存储区(请参阅死信队列),然后对它们进行确认。
yc0p9oo02#
我读了一些关于它的文章,找到了处理失败消息的方法。我们有三个主题
主要议题
副题
死主题
其思想是使用来自主主题的消息,如果它由于任何原因而失败,则将其推送到副主题,它将在t个时间间隔内重试n次。
如果在侧队列中所有的重试之后失败了,那么将它推到死主题,在那里它将简单地通知(使用电子邮件或任何其他服务)失败。
我不确定这是否是标准,但这是最有意义的所有我读到目前为止。
如果可以,请随时添加或更正。