java.nio.channels.closedchannelexception:空远程kafka代理

wwwo4jvm  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(253)

我在三台机器上安装了Kafka。我正试着用Kafka的口诀在Storm中读Kafka的口诀。我用这个代码连接

Broker brokerForPartition0 = new Broker("208.113.164.114:9091");
            Broker brokerForPartition1 = new Broker("208.113.164.115:9092");
            Broker brokerForPartition2 = new Broker("208.113.164.117:9093");
            GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
            partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);
    SpoutConfig spoutConfig = new SpoutConfig(hosts, "newImageTest","", "id1");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Kafka配置文件是(我把最重要的行)


############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each  broker.

broker.id=2

############################# Socket Server Settings    #############################

listeners=PLAINTEXT://208.113.164.115:9092
port=9092

# Hostname the broker will bind to. If not set, the server will bind to  all interfaces

host.name=208.113.164.115

# Hostname the broker will advertise to producers and consumers. If not  set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value  returned from

# java.net.InetAddress.getCanonicalHostName().

advertised.host.name=208.113.164.115

# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

advertised.port=9092

我得到一个例外:

2017-01-12T23:07:15.581+0000 b.s.util [ERROR] Async loop died!
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.9.0.0.jar:na]
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.9.0.0.jar:na]
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.9.0.0.jar:na]
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.9.0.0.jar:na]
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.9.0.0.jar:na]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.6.jar:0.9.6]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.6.jar:0.9.6]
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.6.jar:0.9.6]
    at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:34) ~[storm-kafka-0.9.6.jar:0.9.6]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:92) ~[storm-kafka-0.9.6.jar:0.9.6]
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:529) ~[storm-core-0.9.6.jar:0.9.6]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.6.jar:0.9.6]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

请帮忙!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题