我是新来的Apache风暴。这个程序是将输入的句子分成单词,然后进行单词计数。我在console.log('in spout')和console.log(word)中遇到了一个问题,即在获取输出时。我已附上以下代码:
var storm = require('node-storm')
var randomsentence = (function() {
var sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
console.log('before spout')
return storm.spout(function(sync) {
var self = this
setTimeout(function() {
console.log('in spout')
var i = Math.floor(Math.random()*sentences.length)
var sentence = sentences[i]
self.emit([sentence]) /* {id:'unique'} //for reliable emit */
sync()
}, 100)
}).declareOutputFields(["word"])
})()
var splitsentence = storm.basicbolt(function(data) {
var words = data.tuple[0].split(" ")
for (var i = 0; i < words.length; ++i) {
var word = words[i].trim()
if (word) {
this.emit([word])
console.log(word)
}
}
}).declareOutputFields(["word"])
var wordcount = (function() {
var counts = {}
return storm.basicbolt(function(data) {
var word = data.tuple[0]
if (counts[word] == null) {
counts[word] = 0
}
var count = ++counts[word]
this.emit([word, count])
}).declareOutputFields(["word", "count"])
})()
var builder = storm.topologybuilder()
builder.setSpout('randomsentence', randomsentence)
builder.setBolt('splitsentence', splitsentence, 8).shuffleGrouping('randomsentence')
builder.setBolt('wordcount', wordcount, 12).fieldsGrouping('splitsentence', ['word'])
//builder.setBolt('word',word,3).shuffleGrouping('wordcount')
var topology = builder.createTopology()
var options = {
// name: 'optional... the default name is the name of the topology script',
//nimbus: 'localhost:2181',
//nimbus: '127.0.0.1:2181',
nimbus: '172.26.4.227:2181',
config: { 'topology.debug': true }
}
storm.submit(topology, options, function(err, topologyName) {
// Handle error or submission success
if(err){console.log(err)}
console.log(topologyName)
})
//process.on('uncaughtException', function (err) {
// console.log(err);
//});
1条答案
按热度按时间flmtquvp1#
我相信你没有看到
console.log('in spout')
以及console.log(word)
是因为风暴的处理方式。喷口和螺栓的内容将在集群中各个storm节点上的工作进程中执行。日志将位于这些风暴节点上。