KafkaListner单元测试

nhhxz33t  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(95)

我有以下方法来使用来自Kafka主题的数据。

@KafkaListener(
            groupId = "<group-id>",
            topics = "<topic>",
            concurrency = "<concurrency>",
    )
    public void listen(
            Acknowledgment ack,
            @Payload final ConsumerRecord<String, GenericRecord> consumerRecord,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) String offset,
            @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) String key,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic
    ) throws IOException {

        // Consumes the data with consumerRecord and doing the rest of the stuff

        ack.acknowledge();
    }

我需要为此编写一个单元测试。我尝试做的是将一些数据发布到测试主题,并测试该侦听器是否使用了这些数据。我该怎么做?
想办法解决这个问题。

hivapdat

hivapdat1#

你要做的不是单元测试。
单元测试是指模拟整个环境并直接调用正在测试的方法:

@InjectMocks
Listener listener;

@Mock
Handler handler;

@Test
void givenSomething() {
    var id = UUID.randomUUID();
    listener.listen(id);
    verify(handler).handle(id);
}

如果你想用一个真实的Kafka broker来测试监听器,这些都是集成测试,你需要为此准备一个测试环境。
要做到这一点,您可以使用一个testcontainers或一个单独的docker-compose文件,它将在您的测试之前运行,以便真实的环境不会与测试环境混合。
testcontainers
docker-compose
此外,还有一个spring-kafka-test库,允许您使用驻留在应用程序内存中的kafka编写集成测试。
https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: groupName
test:
  topic: test-topic
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
        
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

你可以在这里看到更多细节:https://www.baeldung.com/spring-boot-kafka-testing

相关问题