当我用producer发送Kafka消息时,我尝试测试onFailure用例,但onFailure方法从未触发。
下面是我发送消息的代码:
@Component
public class MessageSending {
@Autowired
Map<String, KafkaTemplate<String, String>> producerByCountry;
String topicName = "countryTopic";
public void sendMessage(String data) {
producerByCountry.get("countryName").send(topicName, data).addCallback(
onSuccess -> {},
onFailure -> log.error("failed")
);
}
}
这是测试类,但它仍然是一个成功的案例,我不知道如何测试失败的案例(我想在onFailure块中添加一些处理,但我首先想知道如何通过测试触发onFailure)。
@EmbeddedKafka
@SpringBootTest
public class MessageSendingTest {
@MockBean
Map<Country, KafkaTemplate<String, String>> producerByCountry;
@Autowired
EmbeddedKafkaBroker embeddedKafka;
@Autowired
MessageSending messageSending;
@Test
void failTest(CapturedOutput capturedOutput) {
var props = KafkaTestUtils.producerProps(embeddedKafka);
var producerTemplate = new DefaultKafkaProducerFactory<String, String>(props);
var template = new KafkaTemplate<>(producerTemplate);
given(producerByCountry.get("USA"))).willReturn(template);
messageSending.sendMessage("data");
assertThat(capturedOutput).contains("failed");
}
}
我还尝试了本主题How to test Kafka OnFailure callback with Junit?的思想,方法是执行
doAnswer(invocationOnMock -> {
ListenableFutureCallback<SendResult<String, String>> listenableFutureCallback = invocationOnMock.getArgument(0);
KafkaProducerException value = new KafkaProducerException(new ProducerRecord<String, String>("myTopic", "myMessage"), "error", ex);
listenableFutureCallback.onFailure(value);
return null;
}).when(mock(ListenableFuture.class)).addCallback(any(ListenableFutureCallback.class));
但是我得到了这个mockito异常org.mockito.exceptions.misusing.UnnecessaryStubbingException
,它在when().addCallback之前到期
有人能帮忙吗?
谢谢。
1条答案
按热度按时间eanckbw91#
您可以使用模拟模板;请看下面的回答:
如何从KafkaTemplate模拟结果
您还可以模拟底层的
Producer
对象-下面是一个更接近您的用例的示例...对于3.0或更高版本,请使用
CompletableFuture
而不是ListenableFuture
。以及
(the后者是因为Spring Framework 6.0+不再合并嵌套的异常消息;顶层异常是
KafkaProducerException
,实际异常作为其原因)。