我想测试我的Kafka消费者,但有问题的@EmbddedKafka。
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaEventConsumer {
private final CustomInterface customInterface;
@KafkaListener(topics = "test-topic")
public void consumeEvents(Event event) {
customInterface.apply(event);
}
}
}
我的测试类如下
@EmbeddedKafka
@ExtendWith(MockitoExtension.class)
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConsumerTest {
private Producer<String, String> producer;
private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
"event-file.json").toFile();
private KafkaEventConsumer kafkaEventConsumer;
@Mock
private CustomInterface CustomInterface;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ObjectMapper objectMapper;
@BeforeAll
void setUp() {
kafkaEventConsumer = new KafkaEventConsumer(customInterface);
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(),
new StringSerializer()).createProducer();
}
@Test
void consumeEvents() throws IOException, BadCurrencyException {
var event = objectMapper.readValue(EVENT_JSON,Event.class);
String message = objectMapper.writeValueAsString(event);
producer.send(new ProducerRecord<>("test-topic", 0, "1", message));
producer.flush();
// Read the message and assert its properties
verify(customeInterface, timeout(10000).times(1)).apply(any());
}
@AfterAll
void shutdown() {
producer.close();
}
}
测试未通过,消费者未拦截消息
Wanted but not invoked:
customInterface.apply(
<any> );
Actually, there were zero interactions with this mock.
PS:我关注了这个有趣的article
1条答案
按热度按时间9gm1akwq1#
我使用了
KafkaTemplate