在我使用storm将下载的所有tweet存储到mongodb数据库之后,我正在尝试统计用户的原始tweet数量。不管怎样,每当我用下面的代码计算作者原创tweet的数量时,它就会一直读取(并计算)同一条tweet。
螺栓:
public class CalculateTheMetrics extends BaseBasicBolt {
Map<String,Double>OT1=new HashMap<String, Double>();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("USERNAME","OT1"));
}
@Override
public void execute(Tuple input,BasicOutputCollector collector) {
String author=input.getString(0);
String tweet=input.getString(2);
Double OT1=this.OT1.get(author);
if(OT1==null){
OT1=0.0;
}
if(author!=null && tweet!=null ){
if(!tweet.startsWith("@") || !tweet.startsWith("RT")){
OT1+=1;
}
this.OT1.put(author,OT1);
System.out.println(author+" +OT1);
collector.emit(new Values(author,OT1))
}
}
拓扑结构:
public class TheAuthorsAndTheirTweetData {
public static void main(String[]args) throws Exception{
TopologyBuilder topologyBuilder=new TopologyBuilder();
topologyBuilder.setSpout("READ_TWEET_DATA_FROM_MONGODB", new ReadLinesFromTextFile("tweets.txt"));
topologyBuilder.setBolt("TWEET_DATA_FROM_MONGODB_TO_FURTHER_PROCESSING",new FromMongoDBToProcessing()).shuffleGrouping("READ_TWEET_DATA_FROM_MONGODB");
topologyBuilder.setSpout("READ_THE_AUTHORS_FROM_TEXT_FILE",new ReadLastLineFromTextFile("authors.txt"));
topologyBuilder.setBolt("FROM_THE_AUTHORS_TEXT_FILE_TO_FURTHER_PROCESSING", new FromTheAuthorsTextFileToFurtherProcessing()).shuffleGrouping("READ_THE_AUTHORS_FROM_TEXT_FILE");
topologyBuilder.setBolt("SEARCH_FOR_THE_AUTHORS_TWEET_DATA",new SearchForTheAuthorsTweetData(),16).fieldsGrouping("TWEET_DATA_FROM_MONGODB_TO_FURTHER_PROCESSING",new Fields("USERNAME","ID")).fieldsGrouping("FROM_THE_AUTHORS_TEXT_FILE_TO_FURTHER_PROCESSING",new Fields("USERNAME","ID"));
topologyBuilder.setBolt("CALCULATE_THE_METRICS",new CalculateTheMetrics(),64).fieldsGrouping("SEARCH_FOR_THE_AUTHORS_TWEET_DATA",new Fields("USERNAME"));
Config config=new Config();
if(args!=null && args.length>0){
config.setNumWorkers(10);
config.setNumAckers(5);
config.setMaxSpoutPending(100);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
}else{
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("Test",config,topologyBuilder.createTopology());
Utils.sleep(1*60*60*1000);
localCluster.killTopology("Test");
localCluster.shutdown();
}
}
}
我想要的是,让它停止重复阅读同一条微博和计算同一条微博。请帮助
1条答案
按热度按时间hc2pp10m1#
像这样的?