试着弄清楚我是否可以使用springkafka和springkafka测试为@kafkalistener编写单元测试。
我的听众课。
public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;
@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
myMessageProcessor.process(message);
log.info("MyMessage processed");
}}
我的测试类:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {
@Autowired
private MyMessageProcessor myMessageProcessor;
@Value("${kafka.topic.01}")
private String TOPIC_01;
@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;
@Test
public void testSalesforceMessageListner() {
MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}
我的测试配置类:
@Configuration
@EnableKafka
public class TestKafkaConfig {
@Bean
public MyMessageProcessor myMessageProcessor() {
return mock(MyMessageProcessor.class);
}
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
}
//Consumer
@Bean
public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageConsumerFactory());
return factory;
}
//Producer
@Bean
public ProducerFactory<String, MyMessage> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, MyMessage> messageProducer() {
return new KafkaTemplate<>(producerFactory());
}
}
有什么简单的方法可以让这一切顺利进行吗?
或者我应该用其他方法测试@kafkalistener?在单元测试中,当新消息到达kafka时,如何确保@kafkalistener被调用。
3条答案
按热度按时间kpbwa7wx1#
如何确保在新消息到达kafka时调用@kafkalistener。
好吧,这本质上是一个测试这种功能的框架责任。在您的例子中,您只需要专注于业务逻辑和单元测试,而不是在框架中编译的代码。此外,也没有什么好的点来测试
@KafkaListener
只记录传入消息的方法。要找到测试用例验证的钩子肯定太难了。另一方面,我真的相信你的商业逻辑
@KafkaListener
方法比你展示的要复杂得多。因此,最好验证从该方法调用的自定义代码(例如db insert、其他服务调用等),而不是尝试精确地找出myMessageListener()
.你用这个做什么
mock(MyMessageProcessor.class)
是业务逻辑验证的一种好方法。你的代码中唯一错误的地方就是对EmbeddedKafka
:使用注解,还声明@Bean
在配置文件中。你应该考虑去掉其中一个。虽然还不清楚你的产品代码在哪里,但它实际上是免费的嵌入Kafka。否则,如果一切都在测试范围内,我看不出您的消费者和生产者工厂配置有任何问题。你肯定有一个最小可能的配置@KafkaListener
以及KafkaTemplate
. 你只需要移除一个@EmbeddedKafka
不要启动代理两次。ljo96ir52#
这是我为消费者提供的工作方案,基于您的代码。谢谢:-)
配置如下:
将测试中需要包含的所有bean放在不同的类中:
我创建了一个basekafkaconsumertest类来重用它:
扩展基类以支持您的使用者:
就像我在其他帖子里读到的,唐´不要这样测试业务逻辑。只是电话已经打好了。
ar5n3qh53#
您可以在测试用例中 Package 侦听器。
鉴于
然后