在两个logstash示例之间使用kafka集群设置elasticsearch管道。我需要为一个主题重置12小时的偏移量,然后再次启动消费者。
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kfkserver:9092 --topic topicname --time 1488153601000
它回来了 topicname:0:3730858
1488153601000<-2017-02-27 00:00:01毫秒
如何设置偏移时间?
1条答案
按热度按时间szqfcxe21#
如果您使用的是0.10.x,并且没有在0.11中添加的很棒的偏移管理工具,那么就有一个使用kafka-console-consumer.sh来更改消费组偏移的黑客程序。不过,这只适用于数字偏移量,而不适用于时间戳。
首先,停止正在运行的使用该使用者的任何进程。完全关闭是最好的。然后,运行如下命令:
./kafka-consumer-groups.sh --bootstrap-server :9092
--group my-consumer-group
--describe