我正在尝试为我正在使用SpringBoot2.x开发的kafka侦听器编写一个单元测试。作为一个单元测试,我不想启动一个完整的kafka服务器一个zookeeper的示例。所以,我决定使用Spring嵌入Kafka。
我的听众的定义是非常基本的。
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
同样是验证 latch
计数器在收到消息后等于零,非常容易。
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
不幸的是,考试失败了,我不明白为什么。是否可以使用 KafkaEmbedded
测试用注解标记的方法 @KafkaListener
?
所有代码都在我的github存储库kafka listener中共享。
谢谢大家。
2条答案
按热度按时间nkhmeac61#
也许有人会觉得这很有用。我也有类似的问题。本地测试正在运行(一些检查是在awaitibility.waitatmost中执行的),但是在jenkins管道中,测试失败了。
正如在投票最多的答案中已经提到的,解决方案是设置
auto-offset-reset=earliest
. 当测试运行时,您可以通过查看测试日志来检查是否正确设置了配置。spring输出生产者和消费者的配置mv1qrgav2#
您可能在为使用者分配主题/分区之前发送消息。设置属性。。。
…默认为
latest
.这就像使用
--from-beginning
与控制台使用者。编辑
哦;你没有使用boot的属性。
添加
编辑2
顺便说一句,你也应该做一个
get(10L, TimeUnit.SECONDS)
关于template.send()
(一)Future<>
)Assert发送成功。编辑3
要仅为测试覆盖偏移量重置,可以执行与对代理地址所做的相同的操作:
和
但是,请记住,此属性仅适用于组第一次消费时。每次应用程序启动时,要始终从末尾开始,您必须在启动过程中查找到底。
另外,我建议设置
enable.auto.commit
至false
因此,容器负责提交偏移量,而不是仅仅依赖于使用者客户机按照时间表来完成。