我用storm作为消费者来阅读Kafka的一个主题。当制作人开始制作消息时,我能够完整地阅读主题。但是,如果我尝试从一个特定的主题分区读取日志,它会出错退出。
我试过的是:
Broker p1 = new Broker("localhost:9092");
GlobalPartitionInformation info = new GlobalPartitionInformation() ;
info.addPartition(0,p1) ; // 0th partition is mapped to broker p1
StaticHosts hosts = new StaticHosts(info) ;
SpoutConfig cfg = new SpoutConfig(hosts,topic,zkroot, spouteID) ;
KafkaSpout spout = new KafkaSpout(cfg);
//topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT_ID, kspout, numSpoutExecutors);
...
//whereNumber of workers
int numSpoutExecutors = 1;
是否可以“从主题分区读取”?使用kafka消费api从主题分区读取并使用喷口发射有解决方法,但我不想使用它?
编辑1
异常后中止的运行时错误
41226 [Thread-8-kafka-solar-spout] ERROR backtype.storm.util - Async loop died!
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:128) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:89) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:34) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:92) ~[storm-kafka-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$fn__3284$fn__3299.invoke(executor.clj:520) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
41228 [Thread-8-kafka-solar-spout] ERROR backtype.storm.daemon.executor -
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:128) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:89) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:34) ~[storm-kafka-0.10.0.jar:0.10.0]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:92) ~[storm-kafka-0.10.0.jar:0.10.0]
at backtype.storm.daemon.executor$fn__3284$fn__3299.invoke(executor.clj:520) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
41263 [Thread-8-kafka-solar-spout] INFO backtype.storm.util - Halting process: ("Worker died")
谢谢您。
暂无答案!
目前还没有任何答案,快来回答吧!