无法重置Kafka偏移

798qvoo8  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(672)

我试图重置Kafka偏移,但无法重置它。
当前偏移量为6,尝试将其重置为最早的偏移量,并尝试将其减小1,但它未重置并继续反射6

kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --describe
Consumer group 'EDWOFFSETGROUP_24' has no active members.

TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
EDW_TOTALS_JSON1 0          6               6               0               -               -               -

接近1

kafka-consumer-groups --bootstrap-server <server>:9092  --group EDWOFFSETGROUP_24 --topic EDW_TOTALS_JSON1:0 --reset-offsets --shift-by -1
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
[2019-12-04 14:43:14,364] WARN New offset (5) is lower than earliest offset for topic partition EDW_TOTALS_JSON1-0. Value will be set to 6 (kafka.admin.ConsumerGroupCommand$)

TOPIC                      PARTITION  NEW-OFFSET     
EDW_TOTALS_JSON1           0          6

进近#2

kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --topic EDW_TOTALS_JSON1 --reset-offsets --to-earliest --execute

TOPIC                      PARTITION  NEW-OFFSET     
EDW_TOTALS_JSON1           0          6
gupuwyp2

gupuwyp21#

似乎通过传递保留策略清除了数据。如果消息被截断,它将从主题分区细节上出现的新位置开始。
请参阅保留设置定义
您可以尝试与新消费者进行验证

bin/kafka-console-consumer.sh --zookeeper <zk_host>:2181 --topic test --from-beginning

如果您正在使用新的api,请使用下面的

bin/kafka-console-consumer.sh --bootstrap-server <server>:9092 --topic EDW_TOTALS_JSON1 --from-beginnin

您还可以使用getoffsetshell检查分区的偏移详细信息

bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore  >
--topic <topic> REQUIRED: The topic to get offsets from.

例子

kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1

相关问题