如何使用emit()和sync()从风暴喷口输出元组流?

ldioqlga  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(360)

(xpost github问题)
我是新来的风暴。我找到了有用的节点风暴库,并且我已经成功地提交了拓扑,但是我无法让我的喷口发出元组流。
节点风暴 wordcount 这个例子很好用。
我想要一个订阅websocket并以元组形式输出任何消息的喷口。
这是我目前为止的尝试。我想我有一些配置错误,因为我知道我的 wsEmitter 正在发射 future 事件,但我的风暴用户界面显示零喷口发射。
我怀疑也许我不应该在spout函数中绑定侦听器?
这个函数是否被多次调用(看起来像是。。。看到了吗https://github.com/rallysoftware/node-storm/blob/master/lib/spout.js#l4 )
是什么 sync 我应该什么时候用?

var storm = require('node-storm');
var wsEmitter = require('./wsEmitter.js')();
wsEmitter.init();  // subscribe to websocket

var futuresSpout = storm.spout(function(sync) {
  var self = this;
  console.log('subscribing to ws');
  wsEmitter.on('future', function(data){       // websocket data arrived
    self.emit([data]);
    sync();
  });
})
.declareOutputFields(["a"]);
nfs0ujit

nfs0ujit1#

原来我有两个问题。首先,我的拓扑没有执行,因为我的一个螺栓(未显示)未能设置 .declareOutputFields() .
第二,我需要推迟从喷口排放,直到主管要求一个排放与 nextTick() . 我通过缓冲任何传入的消息,直到主管调用喷口:

module.exports = (function(){
  var storm = require('node-storm');

  var wsEmitter = require('./wsEmitter.js')();
  wsEmitter.init();

  var queue = [];
  var queueEmpty = true;

  wsEmitter.on('thing', function(data){
    var trade = JSON.parse(data);
    trade.timeReported = new Date().valueOf();
    queue.push(trade);
    queueEmpty = false;
  });

  return storm.spout(function(sync) {
    var self = this;
    setTimeout(function(){
      if(!queueEmpty){
        self.emit([queue.shift()]);
        queueEmpty =
        ( queue.length === 0
        ? true
        : false )
      }
      sync();
    }, 100);

  })
  .declareOutputFields(['trade'])
})()

相关问题