如何使用kafkaspout读取kafka主题的特定分区

lokaqttq  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(179)

我用storm作为消费者来阅读Kafka的一个主题。当制作人开始制作消息时,我能够完整地阅读主题。但是,如果我尝试从一个特定的主题分区读取日志,它会出错退出。
我试过的是:

Broker p1 = new Broker("localhost:9092");
    GlobalPartitionInformation info = new GlobalPartitionInformation() ;
    info.addPartition(0,p1) ; // 0th partition is mapped to broker p1
    StaticHosts hosts = new StaticHosts(info) ;
    SpoutConfig cfg = new SpoutConfig(hosts,topic,zkroot, spouteID) ;
    KafkaSpout spout = new KafkaSpout(cfg);

    //topology
     TopologyBuilder builder = new TopologyBuilder();
     builder.setSpout(KAFKA_SPOUT_ID, kspout, numSpoutExecutors);
...

//whereNumber of workers
 int numSpoutExecutors = 1;

是否可以“从主题分区读取”?使用kafka消费api从主题分区读取并使用喷口发射有解决方法,但我不想使用它?

编辑1

异常后中止的运行时错误

41226 [Thread-8-kafka-solar-spout] ERROR backtype.storm.util - Async loop died!
java.nio.channels.ClosedChannelException: null
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:128) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:89) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:34) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:92) ~[storm-kafka-0.10.0.jar:0.10.0]
    at backtype.storm.daemon.executor$fn__3284$fn__3299.invoke(executor.clj:520) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
41228 [Thread-8-kafka-solar-spout] ERROR backtype.storm.daemon.executor -
java.nio.channels.ClosedChannelException: null
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:128) ~[kafka_2.10-0.8.2-beta.jar:na]
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:89) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:34) ~[storm-kafka-0.10.0.jar:0.10.0]
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:92) ~[storm-kafka-0.10.0.jar:0.10.0]
    at backtype.storm.daemon.executor$fn__3284$fn__3299.invoke(executor.clj:520) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
41263 [Thread-8-kafka-solar-spout] INFO  backtype.storm.util - Halting process: ("Worker died")

谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题