storm hdfs螺栓不工作

djp7away  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(404)

所以我刚开始研究storm并试图理解它。我正在尝试连接到Kafka主题,读取数据并将其写入hdfs。起初,我创建它时没有使用ShuffleGroup(“stormspout”),我的StormUI显示spout正在使用来自主题的数据,但没有任何内容写入到bolt(除了它在hdfs上创建的空文件)。然后我添加了shufflegrouping(“风暴喷口”);现在,螺栓似乎出现了一个错误。如果有人能帮上忙,我会非常感激的。
谢谢,科尔曼
错误
2015-04-13 00:02:58 s.k.partitionmanager[info]读取分区信息来自:/storm/partition\u 0-->null 2015-04-13 00:02:58 s.k.partitionmanager[info]未找到分区信息,使用配置确定偏移量2015-04-13 00:02:58 s.k.partitionmanager[info]来自zookeeper的上次提交偏移量0 2015-04-13 00:02:58 s.k.partitionmanager[info]提交偏移量0大于9223372036854775807,重置为startoffsettime=-2 2015-04-13 00:02:58 s.k.partitionmanager[info]从偏移量0开始kafka 192.168.134.137:0 2015-04-13 00:02:58 s.k.zkcoordinator[info]任务[1/1]完成刷新2015-04-13 00:02:58 b.s.d.task[info]发射:StormSpoutdefault[colmanblah]2015-04-13 00:02:58 b.s.d.executor[info]传输元组任务:2元组:源:stormspout:3,stream:default,id:{462820364856350458=5573117062061876630},[colmanblah]2015-04-13 00:02:58 b.s.d.task[info]emitting:StormSpoutt\uu ack\u init[462820364856350458 5573117062061876630 3]2015-04-13 00:02:58 b.s.d.executor[info]transfering tuple task:1 tuple:source:stormspout:3,流:\确认\初始化,id:{},[462820364856350458 5573117062061876630 3]2015-04-13 00:02:58 b.s.d.executor[info]正在处理1元组的接收消息:源:stormspout:3,流:\确认\初始化,id:{},[462820364856350458 5573117062061876630 3]2015-04-13 00:02:58 b.s.d.executor[info]bolt ack任务:1时间:元组:源:stormspout:3,流:\确认\初始化,id:{},[462820364856350458 5573117062061876630 3]2015-04-13 00:02:58 b.s.d.executor[info]执行完成元组源:stormspout:3,流:\确认\初始化,id:{},[462820364856350458 5573117062061876630 3]任务:1 delta:2015-04-13 00:02:59 b.s.d.executor[信息]prepared bolt stormbolt:(2)2015-04-13 00:02:59 b.s.d.executor[信息]处理2元组的接收消息:源:stormspout:3,流:默认,id:{462820364856350458=5573117062061876630},[colmanblah]
2015-04-13 00:02:59 b.s.util[error]异步循环终止!

java.lang.RuntimeException: java.lang.NullPointerException
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
                    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
            Caused by: java.lang.NullPointerException: null
                    at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    ... 6 common frames omitted
            2015-04-08 04:26:39 b.s.d.executor [ERROR]
            java.lang.RuntimeException: java.lang.NullPointerException
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
                    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
            Caused by: java.lang.NullPointerException: null
                    at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]

代码:

TopologyBuilder builder = new TopologyBuilder();    
    Config config = new Config();
    //config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 7000);
    config.setNumWorkers(1);
    config.setDebug(true);  
    //LocalCluster cluster = new LocalCluster();

    //zookeeper
    BrokerHosts brokerHosts = new ZkHosts("192.168.134.137:2181", "/brokers");      

    //spout
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "myTopic", "/kafkastorm", "KafkaSpout");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConfig.forceFromStart = true;
    builder.setSpout("stormspout", new KafkaSpout(spoutConfig),4);

    //bolt
    SyncPolicy syncPolicy = new CountSyncPolicy(10); //Synchronize data buffer with the filesystem every 10 tuples
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Rotate data files when they reach five MB
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/stormstuff"); // Use default, Storm-generated file names
    builder.setBolt("stormbolt", new HdfsBolt()
                                 .withFsUrl("hdfs://192.168.134.137:8020")//54310
                                 .withSyncPolicy(syncPolicy)
                                 .withRotationPolicy(rotationPolicy)
                                 .withFileNameFormat(fileNameFormat),2
                    ).shuffleGrouping("stormspout");        

    //cluster.submitTopology("ColmansStormTopology", config, builder.createTopology());     

    try {
        StormSubmitter.submitTopologyWithProgressBar("ColmansStormTopology", config, builder.createTopology());

    } catch (AlreadyAliveException e) {
        e.printStackTrace();
    } catch (InvalidTopologyException e) {
        e.printStackTrace();
    }

pom.xml依赖项

<dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency> 
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>0.9.3</version>
            </dependency>
                    <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.10</artifactId>
                        <version>0.8.1.1</version>
                        <exclusions>
                            <exclusion>
                                <groupId>log4j</groupId>
                                <artifactId>log4j</artifactId>
                            </exclusion>
                            <exclusion>
                                <groupId>org.slf4j</groupId>
                                <artifactId>slf4j-simple</artifactId>
                            </exclusion>
                        </exclusions>
                    </dependency>
              </dependencies>
h43kikqp

h43kikqp1#

首先,尝试从execute方法发出值,如果您是从不同的工作线程发出,那么让所有工作线程向linkedblockingqueue中提供数据,并且只有一个工作线程允许从linkedblockingqueue发出值。
其次,尝试将config.setmaxpoutpending设置为某个值,然后再次尝试运行代码,并检查场景是否持续,尝试减小该值。
reference-config.topology\u max\u spout\u pending:设置一次可以挂起单个喷口任务的最大喷口元组数(pending表示元组尚未确认或失败)。强烈建议您设置此配置以防止队列爆炸。

hi3rlvi2

hi3rlvi22#

我最终通过浏览storm源代码找到了答案。
我没准备好

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

包括它

builder.setBolt("stormbolt", new HdfsBolt()
                                      .withFsUrl("hdfs://192.168.134.137:8020")//54310
                                 .withSyncPolicy(syncPolicy)
                                 .withRecordFormat(format)
                                 .withRotationPolicy(rotationPolicy)
                                 .withFileNameFormat(fileNameFormat),1
                    ).shuffleGrouping("stormspout");

在hdfsbolt.java类中,它尝试使用它,如果没有设置它,基本上就会失败。这就是npe的来源。
希望这对其他人有所帮助,确保你已经设置了本课程所需的所有位。一个更有用的错误消息,如“recordformat not set”会很好。。。。

相关问题