我有一个logstash kafka消费群订阅了近20个主题,但对于某个特定的高优先级kafka主题来说,这些主题的处理效果并不好,因此我决定从消费群中删除一个主题,并为高优先级主题创建一个单独的消费群,但不幸的是,我失去了在旧消费群体中的补偿。我是否可以用上一个用户组的初始偏移量启动新的logstash用户组?谢谢
zf9nrax11#
可以使用kafka脚本为新组设置偏移量。示例场景:停止应用程序。检查组的当前偏移量。您可以使用以下命令。输出将包含每个主题的当前偏移量、日志结束偏移量、延迟等信息。 ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupId --describe 为新应用程序将使用的新组id设置偏移量(假设要切换的主题的偏移量为 10001 ) ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group newGroupId --to-offset 10001 --topic topicName --reset-offsets --execute 删除 topicName 从旧应用程序的主题列表。使用设置新的日志存储配置 newGroupId 组id。启动新旧日志存储应用程序。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupId --describe
10001
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group newGroupId --to-offset 10001 --topic topicName --reset-offsets --execute
topicName
newGroupId
1条答案
按热度按时间zf9nrax11#
可以使用kafka脚本为新组设置偏移量。
示例场景:
停止应用程序。
检查组的当前偏移量。您可以使用以下命令。输出将包含每个主题的当前偏移量、日志结束偏移量、延迟等信息。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupId --describe
为新应用程序将使用的新组id设置偏移量(假设要切换的主题的偏移量为10001
)./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group newGroupId --to-offset 10001 --topic topicName --reset-offsets --execute
删除topicName
从旧应用程序的主题列表。使用设置新的日志存储配置
newGroupId
组id。启动新旧日志存储应用程序。