junit Java Spring测试类无法从@Autowired组件访问变量

kjthegm6  于 2022-11-24  发布在  Java
关注(0)|答案(1)|浏览(159)

所以我有一个@Component的听力课是关于Kafka的

@Component
@Data
@Slf4j
public class KafkaConsumer {
    public List<String> saveReserveStock = new ArrayList<>();

    @KafkaListener(topics = "topic")
    public void listenReserveStock(ConsumerRecord<?, ?> consumerRecord) {
        System.out.println("==================================================================");
        System.out.println("consuming records at: " + DateTime.now().toLocalDateTime());
        System.out.println("consuming topic: " + consumerRecord.topic());
        saveReserveStock.add(consumerRecord.value().toString());
        saveReserveStock.add("dummy data");
        saveReserveStock.forEach(System.out::println);
        System.out.println("consumed at: " + DateTime.now().toLocalDateTime());
        System.out.println("==================================================================");
        System.out.println("end at: " + DateTime.now().toLocalDateTime());
    }

    public void emptyConsumer(){
        saveReserveStock = new ArrayList<>();
    }
}

这是一个嵌入Kafka结构

@Slf4j
@EnableKafka
public abstract class EmbeddedKafkaIntegrationTest {
    @Autowired
    protected static EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, false);

    @Autowired
    protected KafkaConsumer kafkaConsumer;
    @Autowired
    private ReactorKafkaProducer reactorKafkaProducer;

    protected abstract void setUp();

    private static boolean started;

    @BeforeClass
    public static void createBroker(){
        log.info("start test class");
        Map<String, String> propertiesMap = new HashMap<>();
        propertiesMap.put("listeners", "PLAINTEXT://localhost:9092");
        embeddedKafkaBroker.brokerProperties(propertiesMap);
        if (!started) {
            try {
                embeddedKafkaBroker.afterPropertiesSet();
                log.info("before class - kafka connected to: "+embeddedKafkaBroker.getBrokersAsString());
            }
            catch (Exception e) {
                log.error("Embedded broker failed to start", e);
            }
            started = true;
        }
    }
    @Before
    public void doSetUp() {
        log.info("before - kafka connected to: "+embeddedKafkaBroker.getBrokersAsString());
        kafkaConsumer.emptyConsumer();
        this.setUp();
    }

    @After
    public void tearDown() {
        kafkaConsumer.emptyConsumer();
        embeddedKafkaBroker.getZookeeper().getLogDir().deleteOnExit();
    }

    @AfterClass
    public static void destroy(){
        log.info("end test class");
    }

}

然后在我的测试类中,使用@Autowired作为KafkaConsumer
在测试类中,我使用这个函数从已使用的侦听器中获取消息

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ImsStockApplication.class},
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class IntegrationTest extends EmbeddedKafkaIntegrationTest {
    @Value("${local.server.port}")
    private int port;
    @Autowired
    private KafkaConsumer kafkaConsumer;
    @Autowired
    private ReactorKafkaProducer reactorKafkaProducer;

    @Before
    public void setUp() {
        RestAssured.port = port;
    }

    @Test
    public void success_SubDetail() {
        reactorKafkaProducer.send("topic", event).block();

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
            log.info("AWAITILITY AT: " + DateTime.now().toLocalDateTime());
            Assert.assertTrue(kafkaConsumer.getFailDecreaseGoodsReceipt().size() > 0);
            Assert.assertTrue(kafkaConsumer.getSaveReserveStock().size() > 0);
            Assert.assertTrue(kafkaConsumer.getSaveBindStock().size() > 0);
        });
    }
}

但结果有时会失败(list empty)...就像列表变量为空,而它不应该为空
下面是侦听器接收消息并将其存储到列表的日志

==================================================================
consuming records at: 2022-07-10T14:16:46.748
consuming topic: topic
{"id":9721,"eventId":"eventId","organizationCode":"ORG","createdDate":1657437282742,"lastModifiedDate":1657437282742,"routingId":"routingId"}
dummy data
consumed at: 2022-07-10T14:16:46.748
==================================================================
end at: 2022-07-10T14:16:46.748

在我的测试类中,当我试图访问变量时,它是空的。它一直在等待列表被填充

AWAITILITY AT: 2022-07-10T14:16:46.829
AWAITILITY AT: 2022-07-10T14:16:46.945
AWAITILITY AT: 2022-07-10T14:16:47.056
AWAITILITY AT: 2022-07-10T14:16:47.164
AWAITILITY AT: 2022-07-10T14:16:47.273
AWAITILITY AT: 2022-07-10T14:16:47.384
AWAITILITY AT: 2022-07-10T14:16:47.490
AWAITILITY AT: 2022-07-10T14:16:47.598

如果我们查看时间戳,列表不应该是空的,对吗?但是为什么我的测试失败了?我哪里出错了?
谢谢

zc0qhyus

zc0qhyus1#

恕我直言,这个问题缺乏信息...所以提供的不是一个真实的的答案,而是一系列可能的答案,可以帮助找到解决方案。
测试失败的可能原因有很多,不一定是因为您的测试代码。
一个可能的原因是,当你连接到Kafka时,你开始收听“最新”的消息(offset = latest)在这种情况下,你将无法使用已经在主题中的消息。虽然这可能是问题的答案,但也许你可以发布实际上将消息发送到主题的代码。这才是真正的问题。
另一个可能的原因是分区的数量。
也有可能是代码本身的原因,但同样,您不会显示所有配置,至少不会显示此处测试的配置。
一个可能的问题是@KafkaListener没有得到正确的处理,所以spring从组件中创建了一个spring bean(毕竟它可以在测试中自动连接),但是没有在幕后插入整个Kafka基础结构。

相关问题