我在三台机器上安装了Kafka。我正试着用Kafka的口诀在Storm中读Kafka的口诀。我用这个代码连接
Broker brokerForPartition0 = new Broker("208.113.164.114:9091");
Broker brokerForPartition1 = new Broker("208.113.164.115:9092");
Broker brokerForPartition2 = new Broker("208.113.164.117:9093");
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);
SpoutConfig spoutConfig = new SpoutConfig(hosts, "newImageTest","", "id1");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Kafka配置文件是(我把最重要的行)
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
############################# Socket Server Settings #############################
listeners=PLAINTEXT://208.113.164.115:9092
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=208.113.164.115
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=208.113.164.115
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092
我得到一个例外:
2017-01-12T23:07:15.581+0000 b.s.util [ERROR] Async loop died!
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.9.0.0.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.9.0.0.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.9.0.0.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.9.0.0.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.9.0.0.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.6.jar:0.9.6]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.6.jar:0.9.6]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.6.jar:0.9.6]
at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:34) ~[storm-kafka-0.9.6.jar:0.9.6]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:92) ~[storm-kafka-0.9.6.jar:0.9.6]
at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
请帮忙!
暂无答案!
目前还没有任何答案,快来回答吧!