我正在使用一个带有共享回复主题的ReplyingKafkaTemplate
(例如group.id是null
并且组管理被禁用)。初始偏移量被设置为“latest”。
我面临以下问题:
1.我手动启动侦听器容器,这将导致一个异步操作,该操作将获取消费者的元数据。
1.我调用sendAndReceive()
方法。
1.应答消息在第一步中的异步操作返回之前创建。
1.异步操作返回并将偏移量设置为1(由于“latest”),因此应答消息将被忽略。
为了解决这个问题,我需要Assert偏移量在调用sendAndReceive()
方法之前被初始化。我已经寻找了一个合适的应用程序事件,但似乎没有一个合适。
是否有任何方法可以Assert使用者已经完全初始化?
这个问题与Wait for kafka consumer to poll before writing messages to kafka spring boot类似,但建议的解决方案也不能解决它(除非我误解了这个建议,否则它只能降低风险,而不能消除风险)。
2条答案
按热度按时间bihw5rsg1#
目前还不可能。
我为此https://github.com/spring-projects/spring-kafka/issues/2318打开了一个问题
编辑
作为一种变通方法,您可以设置
idleEventInterval
并等待第一个ListenerContainerIdleEvent
。k4emjkb12#
我研究了加里提出的问题,现在已经解决了。您现在可以执行类似
replyingKafkaTemplate.waitforAssignment(Duration.ofSeconds(5))
的操作https://github.com/spring-projects/spring-integration/commit/e19e6d4726a6acd52b81820565a22e8020814bda