我们是新来的风暴。我们不知道如何创建拓扑请帮助我们与风暴。我们尝试了“在windows上运行storm”一文中给出的示例wordcount c=topology。但是我们无法理解如何给出一个输入,以及在StormUI中输入和输出的位置。
0md85ypi1#
输入和输出在storm ui中不存在。在storm ui中,您可以看到没有发出元组、处理时间、群集配置和群集运行状况。若要查看输出和输入,请使用记录器机制,然后检查storm包的日志文件夹中存在的每个工作日志文件。要在storm中创建拓扑,您需要两件东西,一个喷口和一个螺栓。请查看下面的示例代码:-samplespout.java文件
import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; public class SampleSpout implements IRichSpout{ SpoutOutputCollector collector; int i=0; List<Object> tupleList; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void nextTuple() { tupleList=new ArrayList<Object>(); tupleList.add("storm"+i); tupleList.add(i); collector.emit(tupleList,i); i++; } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
samplebolt.java文件
import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class SampleBolt implements IBasicBolt { private static Logger log = LoggerFactory.getLogger(SampleBolt.class); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { log.info(input.getValues().toString()+"output values"); } @Override public void cleanup() { // TODO Auto-generated method stub } }
sampletopology.java文件
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class SampleTopology { /** * @param args */ public static void main(String[] args) { TopologyBuilder topology=new TopologyBuilder(); topology.setSpout("sampleSpout",new SampleSpout()); topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout"); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster=new LocalCluster(); cluster.submitTopology("test", conf, topology.createTopology()); } }
1条答案
按热度按时间0md85ypi1#
输入和输出在storm ui中不存在。在storm ui中,您可以看到没有发出元组、处理时间、群集配置和群集运行状况。若要查看输出和输入,请使用记录器机制,然后检查storm包的日志文件夹中存在的每个工作日志文件。要在storm中创建拓扑,您需要两件东西,一个喷口和一个螺栓。请查看下面的示例代码:-
samplespout.java文件
samplebolt.java文件
sampletopology.java文件