bufferunderflowexception与Storm中的Kafka喷口

u5rb5r59  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(162)

我为暴风拓扑配置了Kafka喷口。
创建喷口的代码:

BrokerHosts hosts = new ZkHosts(PropertiesUtils.getString("zookeeper.host"));
    String topics = "Test1_TOPIC";        
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topics[i], "/kafka" + topics[i], "discovery");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    topologyBuilder.setSpout("kafka", kafkaSpout, 6);

我无法获取任何消息,将引发以下异常。程序正在终止。

java.lang.RuntimeException: java.nio.BufferUnderflowException
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:201) ~[storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_31]
   Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151) ~[?:1.8.0_31]
at java.nio.ByteBuffer.get(ByteBuffer.java:715) ~[?:1.8.0_31]
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.11-0.10.0.0.jar:?]
at kafka.api.TopicData$.readFrom(FetchResponse.scala:96) ~[kafka_2.11-0.10.0.0.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) ~[kafka_2.11-0.10.0.0.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.11-0.10.0.0.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) ~[scala-library-2.11.8.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) ~[scala-library-2.11.8.jar:?]
at scala.collection.immutable.Range.foreach(Range.scala:160) ~[scala-library-2.11.8.jar:?]
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) ~[scala-library-2.11.8.jar:?]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.8.jar:?]
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) ~[kafka_2.11-0.10.0.0.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.11-0.10.0.0.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.11-0.10.0.0.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.1.jar:1.0.1]
... 7 more

我使用的是storm core 1.0.1版本,storm kafka 1.0.1版本,kafka版本是kafkaè2.11-0.10.0.0。版本兼容性有问题吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题