有没有一种简单的方法——使用kafkarestapi——将使用者组中所有分区上的使用者偏移量提前到分区的末尾?实际上,有时我希望跳过消耗所有剩余的消息——例如,如果我希望重新生成所有消息。我知道我可以检索消费者组、检索分区、循环并查找每个分区——有没有更简单的方法?
2vuwiymt1#
根据post-consumers示例的文档,您可以在 ConsumerGroup 请求如下:
ConsumerGroup
POST /consumers/testgroup/instances/my_consumer/positions/end HTTP/1.1 Host: proxy-instance.kafkaproxy.example.com Content-Type: application/vnd.kafka.v2+json { "partitions": [ { "topic": "test", "partition": 0 }, { "topic": "test", "partition": 1 } ] }
不过,您仍然需要提前知道订阅的主题和consumergroup的分区。
我想有时跳过消费所有剩余的消息我在这里看到了多个选项,但在我看来,所有这些选项都相当粗糙,而且也没有使用kafkarestapi。
更改保留时间( retention.ms )主题的一个小值(如 1 ),请稍等,让logcleaner删除所有消息并将保留时间更改回正常。然后生成新的替代数据。
retention.ms
1
将所有使用者的consumergroup名称更改为新的consumergroup(配置) group.id )并让消费者通过设置 auto.offset.reset=latest . 然后生成新的替代数据。
group.id
auto.offset.reset=latest
类似于我最初的回答,使用Kafka工具 kafka-consumer-groups 要手动将消费者组(例如“myconsumer”)的偏移量更改为结束偏移量,请执行以下操作:
kafka-consumer-groups
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group myConsumer --topic myTopic --to-latest
1条答案
按热度按时间2vuwiymt1#
根据post-consumers示例的文档,您可以在
ConsumerGroup
请求如下:不过,您仍然需要提前知道订阅的主题和consumergroup的分区。
编辑:
我想有时跳过消费所有剩余的消息
我在这里看到了多个选项,但在我看来,所有这些选项都相当粗糙,而且也没有使用kafkarestapi。
方案1
更改保留时间(
retention.ms
)主题的一个小值(如1
),请稍等,让logcleaner删除所有消息并将保留时间更改回正常。然后生成新的替代数据。方案2
将所有使用者的consumergroup名称更改为新的consumergroup(配置)
group.id
)并让消费者通过设置auto.offset.reset=latest
. 然后生成新的替代数据。方案3
类似于我最初的回答,使用Kafka工具
kafka-consumer-groups
要手动将消费者组(例如“myconsumer”)的偏移量更改为结束偏移量,请执行以下操作: