Kafka Spring卡发送消息失败的测试

tv6aics1  于 2023-01-12  发布在  Apache
关注(0)|答案(1)|浏览(130)

当我用producer发送Kafka消息时,我尝试测试onFailure用例,但onFailure方法从未触发。
下面是我发送消息的代码:

@Component
public class MessageSending {

    @Autowired
    Map<String, KafkaTemplate<String, String>> producerByCountry;

    String topicName = "countryTopic";

    public void sendMessage(String data) {
        producerByCountry.get("countryName").send(topicName, data).addCallback(
                onSuccess -> {},
                onFailure -> log.error("failed")
        );
    }
}

这是测试类,但它仍然是一个成功的案例,我不知道如何测试失败的案例(我想在onFailure块中添加一些处理,但我首先想知道如何通过测试触发onFailure)。

@EmbeddedKafka
@SpringBootTest
public class MessageSendingTest {

    @MockBean
    Map<Country, KafkaTemplate<String, String>> producerByCountry;

    @Autowired
    EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    MessageSending messageSending;

    @Test
    void failTest(CapturedOutput capturedOutput) {
        var props = KafkaTestUtils.producerProps(embeddedKafka);
        var producerTemplate = new DefaultKafkaProducerFactory<String, String>(props);
        var template = new KafkaTemplate<>(producerTemplate);

        given(producerByCountry.get("USA"))).willReturn(template);

        messageSending.sendMessage("data");

        assertThat(capturedOutput).contains("failed");
        
    }
}

我还尝试了本主题How to test Kafka OnFailure callback with Junit?的思想,方法是执行

doAnswer(invocationOnMock -> {
    ListenableFutureCallback<SendResult<String, String>> listenableFutureCallback = invocationOnMock.getArgument(0);
    KafkaProducerException value = new KafkaProducerException(new ProducerRecord<String, String>("myTopic", "myMessage"), "error", ex);
    listenableFutureCallback.onFailure(value);
    return null;
}).when(mock(ListenableFuture.class)).addCallback(any(ListenableFutureCallback.class));

但是我得到了这个mockito异常org.mockito.exceptions.misusing.UnnecessaryStubbingException,它在when().addCallback之前到期
有人能帮忙吗?
谢谢。

eanckbw9

eanckbw91#

您可以使用模拟模板;请看下面的回答:
如何从KafkaTemplate模拟结果

    • 编辑**

您还可以模拟底层的Producer对象-下面是一个更接近您的用例的示例...

@SpringBootApplication
public class So75074961Application {

    public static void main(String[] args) {
        SpringApplication.run(So75074961Application.class, args);
    }

    @Bean
    KafkaTemplate<String, String> france(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf, Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "france:9092"));
    }

    @Bean
    KafkaTemplate<String, String> germany(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf, Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "germany:9092"));
    }

}

@Component
class MessageSending {

    private static final Logger log = LoggerFactory.getLogger(MessageSending.class);

    @Autowired
    Map<String, KafkaTemplate<String, String>> producerByCountry;

    String topicName = "countryTopic";

    public void sendMessage(String country, String data) {
        producerByCountry.get(country).send(topicName, data).addCallback(
                onSuccess -> log.info(onSuccess.getRecordMetadata().toString()),
                onFailure -> log.error("failed: " + onFailure.getMessage()));
    }

}
@SpringBootTest
@ExtendWith(OutputCaptureExtension.class)
class So75074961ApplicationTests {

    @Test
    void test(@Autowired MessageSending sending, CapturedOutput capture) {
        ProducerFactory<String, String> pf = mock(ProducerFactory.class);
        Producer<String, String> prod = mock(Producer.class);
        given(pf.createProducer()).willReturn(prod);
        willAnswer(inv -> {
            Callback callback = inv.getArgument(1);
            callback.onCompletion(null, new RuntimeException("test"));
            return mock(Future.class);
        }).given(prod).send(any(), any());

        // inject the mock pf into "france" template
        Map<?, ?> producers = KafkaTestUtils.getPropertyValue(sending, "producerByCountry", Map.class);
        new DirectFieldAccessor(producers.get("france")).setPropertyValue("producerFactory", pf);

        sending.sendMessage("france", "foo");
        assertThat(capture)
                .contains("failed: Failed to send; nested exception is java.lang.RuntimeException: test");
    }

}

对于3.0或更高版本,请使用CompletableFuture而不是ListenableFuture

public void sendMessage(String country, String data) {
        producerByCountry.get(country).send(topicName, data).whenComplete(
                (res, ex) -> {
                    if (ex == null) {
                        log.info(res.getRecordMetadata().toString());
                    }
                    else {
                        log.error("failed: " + ex.getMessage());
                    }
                });
    }

以及

assertThat(capture)
                .contains("failed: Failed to send");

(the后者是因为Spring Framework 6.0+不再合并嵌套的异常消息;顶层异常是KafkaProducerException,实际异常作为其原因)。

相关问题