使用accumulo运行storm连接时出错

vawmfj5a  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(343)

我有如下的暴风雪,

package storm.bolt;

import java.util.Map;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class AccumuloBolt implements IRichBolt {

    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    private ZooKeeperInstance instance;
    private Connector connector;
    private BatchWriter bw;
    private Text colf;
    private MultiTableBatchWriter mtbw;
    private final String instanceName;
    private final String zooServers;
    private final String userName;
    private final String password;
    Map<String, Integer> counters;
    /**
     * @param zooServers The host on which Zookeeper is running.
     * @param userName for which Accumula username.
     * @param password The Acumula passowrd
     * written to.
     * String instanceName = "myistance";
         * String zooServers = "192.168.1.81:2181";
         * String userName = "root";
         * String password = "aryadevi";
     */
    public AccumuloBolt(String instanceName, String zooServers, String userName,
            String password) {
        this.instanceName = instanceName;
        this.zooServers = zooServers;
        this.userName = userName;
        this.password = password;
    }
    public void prepare( Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;
        try {
            //this.instance = new ZooKeeperInstance(instanceName, zooServers);
            this.instance = new ZooKeeperInstance("myistance", "192.168.1.81:2181");
            //this.connector= instance.getConnector(userName, password);
            this.connector= instance.getConnector("root", "aryadevi");
            this.mtbw=connector.createMultiTableBatchWriter(200000l, 300, 4);
            this.bw=null;

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    public void execute(Tuple input)  {
        if (shouldActOnInput(input)) {
            try{
            if (!this.connector.tableOperations().exists("new2"))
                      this.connector.tableOperations().create("new2");
            this.bw = this.mtbw.getBatchWriter("new2");
            this.colf=new Text("colfam");
            System.out.println("writing ...");
            String str = input.getString(0);
            if(!counters.containsKey(str)){
            counters.put(str, 1);
            }else{
                Integer c = counters.get(str) + 1;
                counters.put(str, c);
            }

                }catch (Exception e) {
                throw new RuntimeException(e);
            }

        //DBObject updateObj = getDBObjectForInput(input);
        //this.bw.addMutation(m);

        } else {
            collector.ack(input);
        }
    }
    public void cleanup() {
        try{

        for(Map.Entry<String, Integer> entry : counters.entrySet()){
             Mutation m = new Mutation(new Text(String.format("row_%d",entry.getKey() )));
            m.put(this.colf, new Text(String.format("colqual_%d", entry.getKey())), new Value((String.format("value_%d", entry.getValue())).getBytes()));
            System.out.println(entry.getKey()+": "+entry.getValue());
             bw.addMutation(m);         
        }

        this.mtbw.close();
        }catch (Exception e) {
                throw new RuntimeException(e);
            }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
    public boolean shouldActOnInput(Tuple input) {
        return true;
    }
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}

我只需要使用“mvn comple”编译这个torm,并使用mvn包创建一个包,然后我只需要在运行这个错误之后使用以下命令来运行storm-jar-target/storm-twitter-0.0.1-snapshot-jar-with-dependencies.jar storm.twitterstorm

java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
    at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46217 [Thread-8-count] ERROR backtype.storm.util - Async loop died!
java.lang.ExceptionInInitializerError: null
    at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    at org.apache.log4j.Logger.getLogger(Logger.java:43) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    at org.apache.accumulo.core.client.ZooKeeperInstance.<clinit>(ZooKeeperInstance.java:63) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    ... 8 common frames omitted
46218 [Thread-10-count] ERROR backtype.storm.daemon.executor - 
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
    at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46218 [Thread-8-count] ERROR backtype.storm.daemon.executor - 
java.lang.ExceptionInInitializerError: null
    at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    at org.apache.log4j.Logger.getLogger(Logger.java:43) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    at org.apache.accumulo.core.client.ZooKeeperInstance.<clinit>(ZooKeeperInstance.java:63) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
    ... 8 common frames omitted
46218 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor count:[4 4]
46219 [Thread-6] INFO  backtype.storm.daemon.task - Emitting: count __system ["startup"]
46220 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks count:[4 4]
46224 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor count:[4 4]
46224 [Thread-12-count] INFO  backtype.storm.daemon.executor - Preparing bolt count:(4)
46225 [Thread-12-count] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
    at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46226 [Thread-12-count] ERROR backtype.storm.daemon.executor - 
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
    at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
    at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46321 [Thread-10-count] INFO  backtype.storm.util - Halting process: ("Worker died")
46321 [Thread-8-count] INFO  backtype.storm.util - Halting process: ("Worker died")
1tuwyuhd

1tuwyuhd1#

看来这张风暴票有相关的讨论:https://issues.apache.org/jira/browse/storm-122
我认为accumulo有一个slf4j-log4j12依赖,storm使用log4j-over-slf4j,这是不兼容的。讨论似乎建议将slf4j-log4j12和log4j之类的日志依赖项从accumulo依赖项中排除。我不知道这是否有效,但值得一试。

相关问题