编辑仅供参考:github示例
我在互联网上搜索,找不到一个嵌入式Kafka测试的简单实用的例子。
我的设置是:
Spring Boot
多个@kafkalistener,在一个类中有不同的主题
嵌入式Kafka的测试,这是开始罚款
使用kafkatemplate进行测试,kafkatemplate正在发送到topic,但是@kafkalistener方法即使在长时间睡眠后也没有收到任何消息
没有警告或错误显示,只有信息垃圾邮件从Kafka在日志中
请帮帮我。大多数都是过度配置或过度设计的例子。我相信这件事做起来很简单。谢谢,伙计们!
@Controller
public class KafkaController {
private static final Logger LOG = getLogger(KafkaController.class);
@KafkaListener(topics = "test.kafka.topic")
public void receiveDunningHead(final String payload) {
LOG.debug("Receiving event with payload [{}]", payload);
//I will do database stuff here which i could check in db for testing
}
}
private static string sender\u topic=“test.kafka.topic”;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Thread.sleep(10000);
}
3条答案
按热度按时间eagi6jfj1#
嵌入式Kafka测试在以下配置中对我有效,
测试类注解
设置方法的注解前
注意:我没有使用
@ClassRule
创建嵌入式Kafka而不是自动布线@Autowired embeddedKafka
```@Test
public void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
现在
@Test
方法将自动连接kafkatemplate并使用is发送消息更新了带上一行的应答代码块
1tuwyuhd2#
我现在解决了这个问题
在调试时,我看到嵌入的kaka服务器使用了一个随机端口。
我找不到它的配置,所以我将kafka配置设置为与服务器相同。对我来说还是有点难看。
我很想听到@mayur提到的那句话
但在互联网上找不到合适的依赖关系。
0tdrvxhp3#
因为被接受的答案不适合我。我找到了另一个基于https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/ 我想和你分享的。
依赖项是“spring kafka测试”版本:“2.2.7.release”