我正在使用StormKafka客户端1.2.1,并为kafkatridentspoutopaque创建我的喷口配置,如下所示
kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
.setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
.setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
.setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())
我在Kafka和zookeeper中都找不到我的组id和偏移量。通过zookeeper,我尝试了zkcli.sh ls /consumers
但没有,因为我认为Kafka自己现在是保持抵消,而不是Zookeeper。
我也试过用Kafka下面的命令
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550
有人能帮我找到我的偏移量吗?如果我重新启动拓扑,它还会重放我在Kafka的事件吗?
1条答案
按热度按时间nom7f22z1#
三叉戟不存储在Kafka抵消,但在风暴的Zookeeper。如果您使用storm的zookeeper配置的默认设置运行,则storm的zookeeper中的路径类似于
/coordinator/<your-topology-id>/meta
.该路径下的对象将包含第一个和最后一个偏移量,以及每个批的主题分区。例如。
/coordinator/<your-topology-id>/meta/15
将包含批号15中发出的第一个和最后一个偏移量。重新启动后喷口是否重放偏移量由
FirstPollOffsetStrategy
你把一切都安排好了KafkaSpoutConfig
. 默认值为UNCOMMITTED_EARLIEST
,重新启动时不会重新启动。查看javadochttps://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/kafkaspoutconfig.java#l126.