这里可能发生了同样的事情:error backtype.storm.util-异步循环死了!java.nio.bufferunderflowexception:null,但我将添加一个完整的堆栈跟踪和一些其他上下文。
风暴版-9.3
风暴Kafka9.3版
Kafka版本-0.8.2-beta
我也使用三叉戟,虽然我认为这个错误是发生在风暴级别。
堆栈跟踪:
java.lang.RuntimeException: java.nio.BufferUnderflowException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core- 0.9.3.jar:0.9.3]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.nio.BufferUnderflowException: null
at java.nio.Buffer.nextGetIndex(Buffer.java:498) ~[na:1.7.0_71]
at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355) ~[na:1.7.0_71]
at kafka.api.OffsetResponse$.readFrom(OffsetResponse.scala:28) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:128) ~[kafka_2.10-0.8.2-beta.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127) ~[storm-core-0.9.3.jar:0.9.3]
at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-0.9.3.jar:0.9.3]
at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
喷口代码(请注意,我使用的是静态定义的分区Map,只有一个代理用于调试目的):
Broker broker = new Broker("localhost", 9094);
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInfo);
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(hosts, kafkaTopic);
spoutConfig.startOffsetTime = -1L;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(spoutConfig);
1条答案
按热度按时间6mzjoqzu1#
当中断器队列特别是喷口的发送队列发生拥塞时,可能会产生此异常。我建议您增加executor发送缓冲区的大小。它可以解决这个问题。