我有以下方法来使用来自Kafka主题的数据。
@KafkaListener(
groupId = "<group-id>",
topics = "<topic>",
concurrency = "<concurrency>",
)
public void listen(
Acknowledgment ack,
@Payload final ConsumerRecord<String, GenericRecord> consumerRecord,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) String offset,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) String key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) throws IOException {
// Consumes the data with consumerRecord and doing the rest of the stuff
ack.acknowledge();
}
我需要为此编写一个单元测试。我尝试做的是将一些数据发布到测试主题,并测试该侦听器是否使用了这些数据。我该怎么做?
想办法解决这个问题。
1条答案
按热度按时间hivapdat1#
你要做的不是单元测试。
单元测试是指模拟整个环境并直接调用正在测试的方法:
如果你想用一个真实的Kafka broker来测试监听器,这些都是集成测试,你需要为此准备一个测试环境。
要做到这一点,您可以使用一个testcontainers或一个单独的docker-compose文件,它将在您的测试之前运行,以便真实的环境不会与测试环境混合。
testcontainers
docker-compose
此外,还有一个spring-kafka-test库,允许您使用驻留在应用程序内存中的kafka编写集成测试。
https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test
你可以在这里看到更多细节:https://www.baeldung.com/spring-boot-kafka-testing