使用spring嵌入的kafka测试@kafkalistener

ccgok5k5  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(470)

我正在尝试为我正在使用SpringBoot2.x开发的kafka侦听器编写一个单元测试。作为一个单元测试,我不想启动一个完整的kafka服务器一个zookeeper的示例。所以,我决定使用Spring嵌入Kafka。
我的听众的定义是非常基本的。

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

同样是验证 latch 计数器在收到消息后等于零,非常容易。

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

不幸的是,考试失败了,我不明白为什么。是否可以使用 KafkaEmbedded 测试用注解标记的方法 @KafkaListener ?
所有代码都在我的github存储库kafka listener中共享。
谢谢大家。

nkhmeac6

nkhmeac61#

也许有人会觉得这很有用。我也有类似的问题。本地测试正在运行(一些检查是在awaitibility.waitatmost中执行的),但是在jenkins管道中,测试失败了。
正如在投票最多的答案中已经提到的,解决方案是设置 auto-offset-reset=earliest . 当测试运行时,您可以通过查看测试日志来检查是否正确设置了配置。spring输出生产者和消费者的配置

mv1qrgav

mv1qrgav2#

您可能在为使用者分配主题/分区之前发送消息。设置属性。。。

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

…默认为 latest .
这就像使用 --from-beginning 与控制台使用者。
编辑
哦;你没有使用boot的属性。
添加

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

编辑2
顺便说一句,你也应该做一个 get(10L, TimeUnit.SECONDS) 关于 template.send() (一) Future<> )Assert发送成功。
编辑3
要仅为测试覆盖偏移量重置,可以执行与对代理地址所做的相同的操作:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

但是,请记住,此属性仅适用于组第一次消费时。每次应用程序启动时,要始终从末尾开始,您必须在启动过程中查找到底。
另外,我建议设置 enable.auto.commitfalse 因此,容器负责提交偏移量,而不是仅仅依赖于使用者客户机按照时间表来完成。

相关问题