我试着用3个代理和zookeeper测试一个kafka节点。我希望使用控制台工具进行测试。我是这样管理制片人的:
kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9094 --topic testTopic
然后我以这样的方式运行消费者:
kafka-console-consumer --zookeeper localhost:2181 --topic testTopic --from-beginning
我可以在producer中输入消息,并在consumer中看到它们,正如预期的那样。然而,当我使用引导服务器运行消费者的更新版本时,我什么也得不到。例如
kafka-console-consumer --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic testTopic --from-beginning
当我有一个代理在端口9092上运行时,它运行得很好,所以我完全搞不清楚。有没有一种方法可以让我看到zookeeper作为引导服务器提供了什么?引导服务器与代理列表不同吗?用scala2.11编译的kafka。
3条答案
按热度按时间mklgxw1f1#
我不知道怎么了。很可能我把Kafka或Zookeeper放在一个奇怪的状态。在删除
log.dir
中的每个代理和zookeeper主题/brokers/topics
接着,Kafka的消费者重现了这个主题,表现得和预期的一样。cunj1qz12#
我在使用不匹配的版本时遇到了相同的问题:
Kafka客户端库
Kafka剧本
Kafka经纪人
在我的具体场景中,我使用的是带有confluent platform 3.3.0 w/kafka broker 0.11.0.0的confluent kafka客户端库版本0.10.2.1。当我将我的confluent平台降级到3.3.2版本时,它与我的客户机库相匹配,用户的工作如预期的那样。
我的理论是,使用新的consumerapi的最新kafka控制台使用者只检索使用最新格式的消息。在kafka 0.11.0.0中引入了许多消息格式更改。
91zkwejq3#
引导服务器与kafka代理相同。如果您想查看zookeeper提供的引导服务器列表,可以通过任何zk客户机查询znode信息。所有活跃的代理都在/brokers/ids/[brokerid]下注册。你只需要一个地址。下面的命令将为您提供活动引导服务器的列表:
./zookeeper-shell.sh文件localhost:2181 <<“ls/brokers/ids”