等待请求/应答语义中的Kafka消费者初始化偏移量

lvmkulzt  于 2023-03-11  发布在  Apache
关注(0)|答案(2)|浏览(123)

我正在使用一个带有共享回复主题的ReplyingKafkaTemplate(例如group.id是null并且组管理被禁用)。初始偏移量被设置为“latest”。
我面临以下问题:
1.我手动启动侦听器容器,这将导致一个异步操作,该操作将获取消费者的元数据。
1.我调用sendAndReceive()方法。
1.应答消息在第一步中的异步操作返回之前创建。
1.异步操作返回并将偏移量设置为1(由于“latest”),因此应答消息将被忽略。
为了解决这个问题,我需要Assert偏移量在调用sendAndReceive()方法之前被初始化。我已经寻找了一个合适的应用程序事件,但似乎没有一个合适。
是否有任何方法可以Assert使用者已经完全初始化?
这个问题与Wait for kafka consumer to poll before writing messages to kafka spring boot类似,但建议的解决方案也不能解决它(除非我误解了这个建议,否则它只能降低风险,而不能消除风险)。

bihw5rsg

bihw5rsg1#

目前还不可能。
我为此https://github.com/spring-projects/spring-kafka/issues/2318打开了一个问题

编辑

作为一种变通方法,您可以设置idleEventInterval并等待第一个ListenerContainerIdleEvent

k4emjkb1

k4emjkb12#

我研究了加里提出的问题,现在已经解决了。您现在可以执行类似replyingKafkaTemplate.waitforAssignment(Duration.ofSeconds(5))的操作
https://github.com/spring-projects/spring-integration/commit/e19e6d4726a6acd52b81820565a22e8020814bda

相关问题