我正在尝试将storm word count程序的输出写入mongodb。这是我在执行该程序时遇到的错误。虽然我能够成功地打印reusult。但当我尝试写入输出时,问题就开始了。
生成的错误:
Jun 01, 2015 3:24:23 PM com.mongodb.DBTCPConnector fetchMaxBsonObjectSize
WARNING: Exception determining maxBSON size using0
java.io.IOException: couldn't connect to [Ritesh-RM/127.0.1.1:27017] bc:java.net.ConnectException: Connection refused
at com.mongodb.DBPort._open(DBPort.java:206)
at com.mongodb.DBPort.go(DBPort.java:94)
at com.mongodb.DBPort.go(DBPort.java:75)
at com.mongodb.DBPort.findOne(DBPort.java:129)
at com.mongodb.DBPort.runCommand(DBPort.java:138)
at com.mongodb.DBTCPConnector.fetchMaxBsonObjectSize(DBTCPConnector.java:419)
at com.mongodb.Mongo.getMaxBsonObjectSize(Mongo.java:541)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:237)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210)
at com.mongodb.DBCollection.insert(DBCollection.java:80)
at bolts.WordCounter.execute(WordCounter.java:88)
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
Jun 01, 2015 3:24:23 PM com.mongodb.DBTCPConnector fetchMaxBsonObjectSize
WARNING: Exception determining maxBSON size using0
java.io.IOException: couldn't connect to [Ritesh-RM/127.0.1.1:27017] bc:java.net.ConnectException: Connection refused
at com.mongodb.DBPort._open(DBPort.java:206)
at com.mongodb.DBPort.go(DBPort.java:94)
at com.mongodb.DBPort.go(DBPort.java:75)
at com.mongodb.DBPort.findOne(DBPort.java:129)
at com.mongodb.DBPort.runCommand(DBPort.java:138)
at com.mongodb.DBTCPConnector.fetchMaxBsonObjectSize(DBTCPConnector.java:419)
at com.mongodb.DBTCPConnector.checkMaster(DBTCPConnector.java:406)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:144)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210)
at com.mongodb.DBCollection.insert(DBCollection.java:80)
at bolts.WordCounter.execute(WordCounter.java:88)
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
Jun 01, 2015 3:24:23 PM com.mongodb.DBPortPool gotError
WARNING: emptying DBPortPool to localhost:27017 b/c of error
java.io.IOException: couldn't connect to [Ritesh-RM/127.0.1.1:27017] bc:java.net.ConnectException: Connection refused
at com.mongodb.DBPort._open(DBPort.java:206)
at com.mongodb.DBPort.go(DBPort.java:94)
at com.mongodb.DBPort.go(DBPort.java:75)
at com.mongodb.DBPort.say(DBPort.java:70)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:151)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210)
at com.mongodb.DBCollection.insert(DBCollection.java:80)
at bolts.WordCounter.execute(WordCounter.java:88)
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
8037 [Thread-9-word-counter] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.NullPointerException: null
at com.mongodb.DBTCPConnector._error(DBTCPConnector.java:287) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:161) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210) ~[com.mongodb.jar:na]
at com.mongodb.DBCollection.insert(DBCollection.java:80) ~[com.mongodb.jar:na]
at bolts.WordCounter.execute(WordCounter.java:88) ~[classes/:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
8038 [Thread-9-word-counter] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.NullPointerException: null
at com.mongodb.DBTCPConnector._error(DBTCPConnector.java:287) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:161) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210) ~[com.mongodb.jar:na]
at com.mongodb.DBCollection.insert(DBCollection.java:80) ~[com.mongodb.jar:na]
at bolts.WordCounter.execute(WordCounter.java:88) ~[classes/:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
8137 [Thread-9-word-counter] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
文字计数器螺栓
public class WordCounter extends BaseBasicBolt {
Integer id;
String name;
Map<String, Integer> counters;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
/*try{
MongoClient client = new MongoClient(new ServerAddress("localhost", 27017));
DB database = client.getDB("storm");
final DBCollection collection = database.getCollection("query");
BasicDBObject dbObject = new BasicDBObject();
for(Map.Entry<String, Integer> entry : counters.entrySet()){
dbObject.put("searchItem",entry.getKey());
dbObject.put("count",entry.getValue());
collection.insert(dbObject);
}
client.close();
}catch (UnknownHostException e) {
e.printStackTrace();
}*/
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
try{
Mongo mongo = new Mongo("localhost", 27017);
DB db = mongo.getDB("query");
final DBCollection table = db.getCollection("query");
BasicDBObject dbObject = new BasicDBObject();
if(!counters.containsKey(str)){
counters.put(str, 1);
dbObject.put("searchItem",str);
dbObject.put("count",1);
table.insert(dbObject);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
BasicDBObject query = new BasicDBObject();
query.put("searchItem", str);
BasicDBObject documentDetail = new BasicDBObject();
documentDetail.put("count", c);
BasicDBObject updateObj = new BasicDBObject();
updateObj.put("$set", documentDetail);
table.update(query, updateObj);
}
}catch (UnknownHostException e) {
e.printStackTrace();
}
System.out.println("In Counter : ");
}
}
1条答案
按热度按时间a9wyjsp71#
因为mongodb版本,我面临这个错误。我在pom.xml中修改了这个,一切正常。