所以我有一个@Component
的听力课是关于Kafka的
@Component
@Data
@Slf4j
public class KafkaConsumer {
public List<String> saveReserveStock = new ArrayList<>();
@KafkaListener(topics = "topic")
public void listenReserveStock(ConsumerRecord<?, ?> consumerRecord) {
System.out.println("==================================================================");
System.out.println("consuming records at: " + DateTime.now().toLocalDateTime());
System.out.println("consuming topic: " + consumerRecord.topic());
saveReserveStock.add(consumerRecord.value().toString());
saveReserveStock.add("dummy data");
saveReserveStock.forEach(System.out::println);
System.out.println("consumed at: " + DateTime.now().toLocalDateTime());
System.out.println("==================================================================");
System.out.println("end at: " + DateTime.now().toLocalDateTime());
}
public void emptyConsumer(){
saveReserveStock = new ArrayList<>();
}
}
这是一个嵌入Kafka结构
@Slf4j
@EnableKafka
public abstract class EmbeddedKafkaIntegrationTest {
@Autowired
protected static EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, false);
@Autowired
protected KafkaConsumer kafkaConsumer;
@Autowired
private ReactorKafkaProducer reactorKafkaProducer;
protected abstract void setUp();
private static boolean started;
@BeforeClass
public static void createBroker(){
log.info("start test class");
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put("listeners", "PLAINTEXT://localhost:9092");
embeddedKafkaBroker.brokerProperties(propertiesMap);
if (!started) {
try {
embeddedKafkaBroker.afterPropertiesSet();
log.info("before class - kafka connected to: "+embeddedKafkaBroker.getBrokersAsString());
}
catch (Exception e) {
log.error("Embedded broker failed to start", e);
}
started = true;
}
}
@Before
public void doSetUp() {
log.info("before - kafka connected to: "+embeddedKafkaBroker.getBrokersAsString());
kafkaConsumer.emptyConsumer();
this.setUp();
}
@After
public void tearDown() {
kafkaConsumer.emptyConsumer();
embeddedKafkaBroker.getZookeeper().getLogDir().deleteOnExit();
}
@AfterClass
public static void destroy(){
log.info("end test class");
}
}
然后在我的测试类中,使用@Autowired
作为KafkaConsumer
类
在测试类中,我使用这个函数从已使用的侦听器中获取消息
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ImsStockApplication.class},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class IntegrationTest extends EmbeddedKafkaIntegrationTest {
@Value("${local.server.port}")
private int port;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private ReactorKafkaProducer reactorKafkaProducer;
@Before
public void setUp() {
RestAssured.port = port;
}
@Test
public void success_SubDetail() {
reactorKafkaProducer.send("topic", event).block();
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
log.info("AWAITILITY AT: " + DateTime.now().toLocalDateTime());
Assert.assertTrue(kafkaConsumer.getFailDecreaseGoodsReceipt().size() > 0);
Assert.assertTrue(kafkaConsumer.getSaveReserveStock().size() > 0);
Assert.assertTrue(kafkaConsumer.getSaveBindStock().size() > 0);
});
}
}
但结果有时会失败(list empty)...就像列表变量为空,而它不应该为空
下面是侦听器接收消息并将其存储到列表的日志
==================================================================
consuming records at: 2022-07-10T14:16:46.748
consuming topic: topic
{"id":9721,"eventId":"eventId","organizationCode":"ORG","createdDate":1657437282742,"lastModifiedDate":1657437282742,"routingId":"routingId"}
dummy data
consumed at: 2022-07-10T14:16:46.748
==================================================================
end at: 2022-07-10T14:16:46.748
在我的测试类中,当我试图访问变量时,它是空的。它一直在等待列表被填充
AWAITILITY AT: 2022-07-10T14:16:46.829
AWAITILITY AT: 2022-07-10T14:16:46.945
AWAITILITY AT: 2022-07-10T14:16:47.056
AWAITILITY AT: 2022-07-10T14:16:47.164
AWAITILITY AT: 2022-07-10T14:16:47.273
AWAITILITY AT: 2022-07-10T14:16:47.384
AWAITILITY AT: 2022-07-10T14:16:47.490
AWAITILITY AT: 2022-07-10T14:16:47.598
如果我们查看时间戳,列表不应该是空的,对吗?但是为什么我的测试失败了?我哪里出错了?
谢谢
1条答案
按热度按时间zc0qhyus1#
恕我直言,这个问题缺乏信息...所以提供的不是一个真实的的答案,而是一系列可能的答案,可以帮助找到解决方案。
测试失败的可能原因有很多,不一定是因为您的测试代码。
一个可能的原因是,当你连接到Kafka时,你开始收听“最新”的消息(offset = latest)在这种情况下,你将无法使用已经在主题中的消息。虽然这可能是问题的答案,但也许你可以发布实际上将消息发送到主题的代码。这才是真正的问题。
另一个可能的原因是分区的数量。
也有可能是代码本身的原因,但同样,您不会显示所有配置,至少不会显示此处测试的配置。
一个可能的问题是
@KafkaListener
没有得到正确的处理,所以spring从组件中创建了一个spring bean(毕竟它可以在测试中自动连接),但是没有在幕后插入整个Kafka基础结构。