我有一个用例,其中有一个消费群体正在消费消息。我想构建一个api来修改它的偏移量。因此,当使用偏移量调用端点时,我必须更改使用者组的偏移量。我使用的是springboot,而消费者使用的是springkafka。提前谢谢。
62o28rlo1#
springforapachekafka提供了一些方便的机制,用于在应用程序初始化期间或之后的任何时间执行寻道。最简单的方法是让你的听众扩展 AbstractConsumerSeekAware 或实施 ConsumerSeekAware .
AbstractConsumerSeekAware
ConsumerSeekAware
esyap4oy2#
这是一个通过cli的解决方案:列出该组订阅的主题:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
注意“当前偏移”和“日志结束偏移”下的值当前偏移量”是每个分区中此使用者组当前所在的偏移量。重置主题的使用者偏移量(预览):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
这将打印重置的预期结果,但不会实际运行它。重置主题的使用者偏移量(执行):
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
这将执行重置,并将指定主题的使用者组偏移量重置回0。重复1以检查复位是否成功
2条答案
按热度按时间62o28rlo1#
springforapachekafka提供了一些方便的机制,用于在应用程序初始化期间或之后的任何时间执行寻道。
最简单的方法是让你的听众扩展
AbstractConsumerSeekAware
或实施ConsumerSeekAware
.esyap4oy2#
这是一个通过cli的解决方案:
列出该组订阅的主题:
注意“当前偏移”和“日志结束偏移”下的值当前偏移量”是每个分区中此使用者组当前所在的偏移量。
重置主题的使用者偏移量(预览):
这将打印重置的预期结果,但不会实际运行它。
重置主题的使用者偏移量(执行):
这将执行重置,并将指定主题的使用者组偏移量重置回0。
重复1以检查复位是否成功