如何为storm提供计算值

8fsztsew  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(309)

我很难理解如何为storm提供价值,因为我是storm的新手。
我从初学者工具包开始。我通过了 TestWordSpout 下面的代码提供了新的值

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

所以我觉得一次只说一个字 _collector.emit(new Values(word)); 我怎样才能直接提供一组单词。这可能吗?
testwordspout.java语言
我的意思是,当调用nexttuple时,从列表中随机选择一个新单词并发出。在一定的时间间隔后,随机列表可能是这样的

@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels

如果我已经有了这个列表的集合,然后把它提供给storm呢。

nnsrf1az

nnsrf1az1#

storm的设计和构建是为了处理连续的数据流。请看风暴的原因。输入数据不太可能被送入风暴群。一般来说,storm的输入数据来自jms队列、apachekafka或twitter提要等。我认为,您应该传递一些配置。在这种情况下,将适用以下规定。
考虑到storm的设计目的,可以向storm传递非常有限的配置细节,例如rdmbs连接细节(oracle/db2/mysql等)、jms提供者细节(ibm mq/rabbitmq等)或apache kafka细节/hbase等。
对于您提出的特定问题或提供上述产品的配置详细信息,我有三种想法
1.设置喷口或螺栓示例的配置细节
例如:声明示例变量并将值作为spoutt/bolt构造函数的一部分进行赋值,如下所示

public class TestWordSpout extends BaseRichSpout {
         List<String> listOfValues;

   public TestWordSpout(List<String> listOfValues) {
       this.listOfValues=listOfValues;
   }

}

在topology submission类中,使用值列表创建一个spout示例

List<String> listOfValues=new ArrayList<String>();
        listOfValues.add("nathan");
        listOfValues.add("golda");
        listOfValues.add("mike");

        builder.setSpout("word", new TestWordSpout(listOfValues), 3);

这些值在中作为示例变量提供 nextTuple() 方法
请查看storm contrib上针对rdbms/kafka等的配置集的storm集成
2.在中设置配置 getComponentConfiguration() . 此方法用于覆盖拓扑配置,但是,您可以传递一些细节,如下所示

@Override
    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
        if(!_isDistributed) {
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
             List<String> listOfValues=new ArrayList<String>();
             listOfValues.add("nathan");
             listOfValues.add("golda");
             listOfValues.add("mike");
             ret.put("listOfValues", listOfValues);
        }
        return ret;
    }

有关配置的详细信息,请参见 open() or prepare() 分别采用喷口/螺栓的方法。

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        this.listOfValues=(List<String>)conf.get("listOfValues");        
       }

3.在属性文件中声明配置,并将其作为jar文件的一部分提交给storm集群。nimbus节点将jar文件复制到worker节点,并使其可供executor线程使用。open()/prepare()方法可以读取属性文件并分配给示例变量。

yyhrrdl8

yyhrrdl82#

“值”类型接受任何类型的对象和任何数字。
因此,您可以简单地从bolt的execute方法或spout的nexttuple方法发送一个列表:

List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));

您也可以添加一个新字段,只需确保在declareoutputfields方法中声明它

_collector.emit(new Values(words, "a new field value!");

在你的declareoutputfields方法中

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}

可以从execute方法给出的tuple对象中获取拓扑中下一个bolt中的字段:

List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");

相关问题