我在我的应用程序中使用Kafka,我想处理Kafka发送消息期间的超时异常。我试着做下一步:在回调时抛出运行时异常并在方法调用中捕获它。
这是Kafka的电话:
public final class ManualKafkaBetsProducer implements IKafkaBetsProducer
{
@Autowired
private KafkaTemplate<ReportsBetKey, IReportsBet> template;
public void send(IReportsBet bet)
{
template.send(MessageBuilder.withPayload(bet).setHeader(KafkaHeaders.TOPIC, kafkaBetsTopicName)
.setHeader(KafkaHeaders.MESSAGE_KEY, new ReportsBetKey(bet)).build())
.addCallback(res -> logger.debug("Message {} send OK"), ex -> {
throw new RuntimeException(ex);
});
}
}
这是方法调用:
try
{
manualKafkaBetsProducer.send(bet);
}
catch (Exception ex)
{
return TransactionResultType.GENERAL_ERROR.name();
}
问题是:当超时发生时,它不会进入catch块。当发生其他错误时,它工作正常。你知道怎么处理吗?谢谢您。
1条答案
按热度按时间zxlwwiss1#
你的
send
方法返回ListenableFuture<SendResult>
.如果您想在发送失败时在发送线程上获得异常,您可以等待将来完成(通过调用
get()
并检查结果。这个
callback
如果您不希望发送线程等待,则此选项很有用。它给你一个机会让另一个线程在稍后获取结果。