我在写Kafka。我已经在我的服务器上创建了Kafka生产者。我想从kafkaproducer获取数据到我在kafkaproducer的本地系统。
我在r中尝试了以下代码:
library(rkafka)
consumer1<-rkafka.createConsumer("ipaddress:9092","mytest")
consumer11 <- rkafka.read(consumer1)
它抛出以下错误:
[1] "Java-Object{com.musigma.consumer.MuConsumer@3349e9bb}"
Unable to connect to zookeeper server
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within
timeout: 100000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
at com.musigma.consumer.MuConsumer.CreateConsumer(MuConsumer.java:99)
java.lang.NullPointerException
at com.musigma.consumer.MuConsumer.startConsumer(MuConsumer.java:133)
我的zookeeper正在IP地址上成功运行。
1条答案
按热度按时间bqjvbblv1#
第一个参数是zookeeper,它在端口2181上运行
你给了它Kafka港
来源-https://github.com/cran/rkafkajars/blob/master/java/com/musigma/consumer/muconsumer.java#l87
注意:看起来这个库没有被维护,使用zookeeper来连接一个消费者实际上是不赞成的,所以也许可以尝试寻找另一个库