我正在使用opaquetridentkafkaspout来消费来自kafka的消息。下面是代码。我忽略了 max spout pending
这样的配置会导致相同的kafka消息以多个批到达。
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
我得到以下错误一次Kafka喷口开始,但运行平稳之后。
2018-05-29 09:47:21.703 o.a.s.util thread-9-spout-myspout-spout-executor[33][error]异步循环终止!java.lang.runtimeexception:org.apache.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue)上的java.lang.nullpointerexception。java:522)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.utils.disruptorqueue.consumebatchwhenavailable(disruptorqueue。java:487)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.disruptor$consume\u batch\u when\u available.invoke(disruptor。clj:74)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.daemon.executor$fn\uuu5043$fn\uuu5056$fn\uuu5109.invoke(executor。clj:861)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.util$async\u loop$fn\u 557.invoke(util。clj:484)[风暴核心-1.2.1。jar:1.2.1]在clojure.lang.afn.run(afn。java:22)[clojure-1.7.0.jar:?]位于java.lang.thread.run(thread。java:748)[?:1.8.0_]原因:org.apache.storm.kafka.spout.trident.kafkatridentspoutemitter.seek(kafkatridentspoutemitter)上的java.lang.nullpointerexception。java:193)~[stormjar.jar:?]在org.apache.storm.kafka.spout.trident.kafkatridentspoutemitter.emitpartitionbatch(kafkatridentspoutemitter。java:127)~[stormjar.jar:?]在org.apache.storm.kafka.spout.trident.kafkatridentspoutemitter.emitpartitionbatch(kafkatridentspoutemitter。java:51)~[stormjar.jar:?]在org.apache.storm.trident.spout.opaquepartitionedtridentspoutexecutor$emitter.emitbatch(opaquepartitionedtridentspoutexecutor。java:141)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.trident.spout.tridentspoutexecutor.execute(tridentspoutexecutor。java:82)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.trident.topology.tridentboltexecutor.execute(tridentboltexecutor。java:383)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.daemon.executor$fn\u 5043$tuple\u action\u fn\u 5045.invoke(executor。clj:739)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.daemon.executor$mk\u task\u receiver$fn\uu 4964.invoke(executor。clj:468) ~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.disruptor$clojure\u handler$reify\u 4475.onevent(disruptor。clj:41)~[风暴核心-1.2.1。jar:1.2.1]在org.apache.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue。java:509)~[风暴核心-1.2.1。jar:1.2.1] ... 6个以上
有什么建议吗?
1条答案
按热度按时间4zcjmb1e1#
堆栈跟踪表明您正在命中https://issues.apache.org/jira/browse/storm-3046.