我开发了一个 @KafkaListener
这也被标记为 ConsumerAwareRebalanceListener
接口,使用SpringBoot2.0.6。我实现了 onPartitionsAssigned
方法,其中我倒带固定时间的偏移量,比如说60秒。
到现在为止,一直都还不错。
如何使用SpringKafka提供的工具测试上述用例?我想我需要开始一个Kafka经纪人(即 EmbeddedKafka
),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取在最后60秒内到达的消息。
有人能帮我吗?我搜索了一下,但什么也没找到。谢谢。
2条答案
按热度按时间gojuced71#
和
7ajki6be2#
这个
@KafkaListener
具有:属性,以便可以访问其
MessageListenerContainer
提到viaKafkaListenerEndpointRegistry
,你可以简单地@Autowired
进入基于spring测试框架的测试类。那么,你真的可以stop()
以及start()
那个MessageListenerContainer
在你的测试方法中。也要注意怎么做
@KafkaListener
有一个autoStartup()
属性也是。