如何使用nodejs在apachestorm中获取输出

0kjbasz6  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(377)

我是新来的Apache风暴。这个程序是将输入的句子分成单词,然后进行单词计数。我在console.log('in spout')和console.log(word)中遇到了一个问题,即在获取输出时。我已附上以下代码:

var storm = require('node-storm')

var randomsentence = (function() {
    var sentences = [
        "the cow jumped over the moon",
        "an apple a day keeps the doctor away",
        "four score and seven years ago",
        "snow white and the seven dwarfs",
        "i am at two with nature"
    ]
    console.log('before spout')
    return storm.spout(function(sync) {
        var self = this
        setTimeout(function() {
            console.log('in spout')
            var i = Math.floor(Math.random()*sentences.length)
            var sentence = sentences[i]
            self.emit([sentence]) /* {id:'unique'} //for reliable emit */
            sync()
        }, 100)
    }).declareOutputFields(["word"])
})()

var splitsentence = storm.basicbolt(function(data) {
    var words = data.tuple[0].split(" ")
    for (var i = 0; i < words.length; ++i) {
        var word = words[i].trim()
        if (word) {
            this.emit([word])
            console.log(word)
        }
    }
}).declareOutputFields(["word"])

var wordcount = (function() {
    var counts = {}

    return storm.basicbolt(function(data) {
        var word = data.tuple[0]
        if (counts[word] == null) {
            counts[word] = 0
        }
        var count = ++counts[word]
        this.emit([word, count])
    }).declareOutputFields(["word", "count"])
})()

var builder = storm.topologybuilder()
builder.setSpout('randomsentence', randomsentence)
builder.setBolt('splitsentence', splitsentence, 8).shuffleGrouping('randomsentence')
builder.setBolt('wordcount', wordcount, 12).fieldsGrouping('splitsentence', ['word'])
//builder.setBolt('word',word,3).shuffleGrouping('wordcount')

var topology = builder.createTopology()

var options = {
    // name: 'optional... the default name is the name of the topology script',
    //nimbus: 'localhost:2181',
    //nimbus: '127.0.0.1:2181',
    nimbus: '172.26.4.227:2181',
    config: { 'topology.debug': true }
}
storm.submit(topology, options, function(err, topologyName) {
    // Handle error or submission success
if(err){console.log(err)}
console.log(topologyName)
})

//process.on('uncaughtException', function (err) {
  //  console.log(err);
//});
flmtquvp

flmtquvp1#

我相信你没有看到 console.log('in spout') 以及 console.log(word) 是因为风暴的处理方式。喷口和螺栓的内容将在集群中各个storm节点上的工作进程中执行。日志将位于这些风暴节点上。

相关问题