有没有办法通过api重置Kafka消费群体的偏移量?

bkhjykvo  于 2021-07-14  发布在  Java
关注(0)|答案(2)|浏览(344)

我有一个用例,其中有一个消费群体正在消费消息。我想构建一个api来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改使用者组的偏移量。我使用的是springboot,而消费者使用的是springkafka。提前谢谢。

62o28rlo

62o28rlo1#

springforapachekafka提供了一些方便的机制,用于在应用程序初始化期间或之后的任何时间执行寻道。
最简单的方法是让你的听众扩展 AbstractConsumerSeekAware 或实施 ConsumerSeekAware .

esyap4oy

esyap4oy2#

这是一个通过cli的解决方案:
列出该组订阅的主题:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

注意“当前偏移”和“日志结束偏移”下的值当前偏移量”是每个分区中此使用者组当前所在的偏移量。
重置主题的使用者偏移量(预览):

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest

这将打印重置的预期结果,但不会实际运行它。
重置主题的使用者偏移量(执行):

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

这将执行重置,并将指定主题的使用者组偏移量重置回0。
重复1以检查复位是否成功

相关问题