处理kafka超时异常java

5n0oy7gb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(551)

我在我的应用程序中使用Kafka,我想处理Kafka发送消息期间的超时异常。我试着做下一步:在回调时抛出运行时异常并在方法调用中捕获它。
这是Kafka的电话:

public final class ManualKafkaBetsProducer implements IKafkaBetsProducer
{
    @Autowired
    private KafkaTemplate<ReportsBetKey, IReportsBet> template;

    public void send(IReportsBet bet)
    {
        template.send(MessageBuilder.withPayload(bet).setHeader(KafkaHeaders.TOPIC, kafkaBetsTopicName)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new ReportsBetKey(bet)).build())
                .addCallback(res -> logger.debug("Message {} send OK"), ex -> {
                    throw new RuntimeException(ex);
                });
    }
}

这是方法调用:

try
{
    manualKafkaBetsProducer.send(bet);
}
catch (Exception ex)
{
     return TransactionResultType.GENERAL_ERROR.name();
}

问题是:当超时发生时,它不会进入catch块。当发生其他错误时,它工作正常。你知道怎么处理吗?谢谢您。

zxlwwiss

zxlwwiss1#

你的 send 方法返回 ListenableFuture<SendResult> .
如果您想在发送失败时在发送线程上获得异常,您可以等待将来完成(通过调用 get() 并检查结果。
这个 callback 如果您不希望发送线程等待,则此选项很有用。它给你一个机会让另一个线程在稍后获取结果。

相关问题