为storm使用者检查kafka主题的偏移量

7dl7o3gd  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(449)

我正在使用StormKafka客户端1.2.1,并为kafkatridentspoutopaque创建我的喷口配置,如下所示

kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
                .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
                .setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())

我在Kafka和zookeeper中都找不到我的组id和偏移量。通过zookeeper,我尝试了zkcli.sh ls /consumers 但没有,因为我认为Kafka自己现在是保持抵消,而不是Zookeeper。
我也试过用Kafka下面的命令

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --list  --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550

有人能帮我找到我的偏移量吗?如果我重新启动拓扑,它还会重放我在Kafka的事件吗?

nom7f22z

nom7f22z1#

三叉戟不存储在Kafka抵消,但在风暴的Zookeeper。如果您使用storm的zookeeper配置的默认设置运行,则storm的zookeeper中的路径类似于 /coordinator/<your-topology-id>/meta .
该路径下的对象将包含第一个和最后一个偏移量,以及每个批的主题分区。例如。 /coordinator/<your-topology-id>/meta/15 将包含批号15中发出的第一个和最后一个偏移量。
重新启动后喷口是否重放偏移量由 FirstPollOffsetStrategy 你把一切都安排好了 KafkaSpoutConfig . 默认值为 UNCOMMITTED_EARLIEST ,重新启动时不会重新启动。查看javadochttps://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/kafkaspoutconfig.java#l126.

相关问题