如何在storm中创建拓扑

e4eetjau  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(328)

我们是新来的风暴。我们不知道如何创建拓扑请帮助我们与风暴。我们尝试了“在windows上运行storm”一文中给出的示例wordcount c=topology。但是我们无法理解如何给出一个输入,以及在StormUI中输入和输出的位置。

0md85ypi

0md85ypi1#

输入和输出在storm ui中不存在。在storm ui中,您可以看到没有发出元组、处理时间、群集配置和群集运行状况。若要查看输出和输入,请使用记录器机制,然后检查storm包的日志文件夹中存在的每个工作日志文件。要在storm中创建拓扑,您需要两件东西,一个喷口和一个螺栓。请查看下面的示例代码:-
samplespout.java文件

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;

public class SampleSpout implements IRichSpout{

    SpoutOutputCollector collector;
    int i=0;
    List<Object> tupleList;
    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        // TODO Auto-generated method stub

    }
    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }
    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }
    @Override
    public void nextTuple() {
        tupleList=new ArrayList<Object>();
        tupleList.add("storm"+i);
        tupleList.add(i);
        collector.emit(tupleList,i);
        i++;        
    }
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub

    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub

    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}

samplebolt.java文件

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class SampleBolt implements IBasicBolt {

    private static Logger log = LoggerFactory.getLogger(SampleBolt.class);

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        log.info(input.getValues().toString()+"output values");
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

}

sampletopology.java文件

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class SampleTopology {

    /**
     * @param args
     */
    public static void main(String[] args) {

        TopologyBuilder topology=new TopologyBuilder();
        topology.setSpout("sampleSpout",new SampleSpout());
        topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout");

        Config conf = new Config();
        conf.setDebug(true);

        LocalCluster cluster=new LocalCluster();
        cluster.submitTopology("test", conf, topology.createTopology());

    }

}

相关问题