mockito 在Spring Boot中使用EmbeddedKafka测试Kafka consumer @KafkaListener

rvpgvaaj  于 2022-11-08  发布在  Spring
关注(0)|答案(1)|浏览(207)

我想测试我的Kafka消费者,但有问题的@EmbddedKafka。

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaEventConsumer {

    private final CustomInterface customInterface;

    @KafkaListener(topics = "test-topic")
    public void consumeEvents(Event event) {
           customInterface.apply(event);
        }
    }
}

我的测试类如下

@EmbeddedKafka
@ExtendWith(MockitoExtension.class)
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConsumerTest {

    private Producer<String, String> producer;

    private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                                "event-file.json").toFile();

    private KafkaEventConsumer kafkaEventConsumer;

    @Mock
    private CustomInterface CustomInterface;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private ObjectMapper objectMapper;

    @BeforeAll
    void setUp() {
        kafkaEventConsumer = new KafkaEventConsumer(customInterface);
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(),
                                                     new StringSerializer()).createProducer();
    }

    @Test
    void consumeEvents() throws IOException, BadCurrencyException {
        var event = objectMapper.readValue(EVENT_JSON,Event.class);
        String message = objectMapper.writeValueAsString(event);
        producer.send(new ProducerRecord<>("test-topic", 0, "1", message));
        producer.flush();

        // Read the message and assert its properties
        verify(customeInterface, timeout(10000).times(1)).apply(any());
    }

    @AfterAll
    void shutdown() {
        producer.close();
    }

}

测试未通过,消费者未拦截消息

Wanted but not invoked:
customInterface.apply(
     <any> );
Actually, there were zero interactions with this mock.

PS:我关注了这个有趣的article

9gm1akwq

9gm1akwq1#

我使用了KafkaTemplate

@SpringBootTest
@EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
               partitions = 1,
               controlledShutdown = true)
class KafkaConsumerTest {

    private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                                "event-file.json").toFile();
    @Autowired
    KafkaTemplate<String, Event> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @SpyBean
    private KafkaEvenConsumer kafkaEvenConsumer;

    @SpyBean
    private CustomInterface customInterface;

    @Captor
    ArgumentCaptor<Event> eventCaptor;

    @Test
    @SneakyThrows
    @DirtiesContext
    void consumeEvents() {

        Event event = objectMapper.readValue(EVENT_JSON,                                                Event.class);
        kafkaTemplate.send("test-topic, "1", event);

        verify(kafkaEventConsumer,timeout(10000).times(1)).consumeEvents(eventCaptor.capture());
        Event argument = eventCaptor.getValue();
        // .. assert the message properties
        verify(customInterface, timeout(10000).times(1)).apply(any());

    }

}

相关问题