Kafka0.10.1.0更改偏移时间

oprakyz7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(292)

在两个logstash示例之间使用kafka集群设置elasticsearch管道。我需要为一个主题重置12小时的偏移量,然后再次启动消费者。

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  kfkserver:9092 --topic topicname --time 1488153601000

它回来了 topicname:0:3730858 1488153601000<-2017-02-27 00:00:01毫秒
如何设置偏移时间?

szqfcxe2

szqfcxe21#

如果您使用的是0.10.x,并且没有在0.11中添加的很棒的偏移管理工具,那么就有一个使用kafka-console-consumer.sh来更改消费组偏移的黑客程序。不过,这只适用于数字偏移量,而不适用于时间戳。
首先,停止正在运行的使用该使用者的任何进程。完全关闭是最好的。然后,运行如下命令:

bin/kafka-console-consumer.sh --bootstrap-server :9092 \
    --topic my-topic \
    --partition 1 \
    --consumer-property group.id=my-consumer-group \
    --max-messages 0 \
    --offset 12345
``` `--max-messages 0` 在这里很重要;将其设置为任何其他值(包括1)将消耗那么多消息,然后提交该主题/分区中的当前最新偏移量。这一定是控制台使用者中的错误。
接下来,使用kafka-consumer-groups.sh检查您的工作:

./kafka-consumer-groups.sh --bootstrap-server :9092
--group my-consumer-group
--describe

相关问题