bolt

7rfyedvj  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(298)

在我使用storm将下载的所有tweet存储到mongodb数据库之后,我正在尝试统计用户的原始tweet数量。不管怎样,每当我用下面的代码计算作者原创tweet的数量时,它就会一直读取(并计算)同一条tweet。
螺栓:

public class CalculateTheMetrics  extends BaseBasicBolt {

Map<String,Double>OT1=new HashMap<String, Double>();

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("USERNAME","OT1"));
}

@Override
public void execute(Tuple input,BasicOutputCollector collector) {

    String author=input.getString(0);
    String tweet=input.getString(2);

    Double OT1=this.OT1.get(author);
    if(OT1==null){
        OT1=0.0;
    }
    if(author!=null && tweet!=null ){

        if(!tweet.startsWith("@") || !tweet.startsWith("RT")){
            OT1+=1;
        }
        this.OT1.put(author,OT1);

        System.out.println(author+" +OT1); 
        collector.emit(new Values(author,OT1))
       }
}

拓扑结构:

public class TheAuthorsAndTheirTweetData {
public static void main(String[]args) throws Exception{
    TopologyBuilder topologyBuilder=new TopologyBuilder();

    topologyBuilder.setSpout("READ_TWEET_DATA_FROM_MONGODB", new ReadLinesFromTextFile("tweets.txt"));
    topologyBuilder.setBolt("TWEET_DATA_FROM_MONGODB_TO_FURTHER_PROCESSING",new FromMongoDBToProcessing()).shuffleGrouping("READ_TWEET_DATA_FROM_MONGODB");

    topologyBuilder.setSpout("READ_THE_AUTHORS_FROM_TEXT_FILE",new ReadLastLineFromTextFile("authors.txt"));
    topologyBuilder.setBolt("FROM_THE_AUTHORS_TEXT_FILE_TO_FURTHER_PROCESSING", new FromTheAuthorsTextFileToFurtherProcessing()).shuffleGrouping("READ_THE_AUTHORS_FROM_TEXT_FILE");

    topologyBuilder.setBolt("SEARCH_FOR_THE_AUTHORS_TWEET_DATA",new SearchForTheAuthorsTweetData(),16).fieldsGrouping("TWEET_DATA_FROM_MONGODB_TO_FURTHER_PROCESSING",new Fields("USERNAME","ID")).fieldsGrouping("FROM_THE_AUTHORS_TEXT_FILE_TO_FURTHER_PROCESSING",new Fields("USERNAME","ID"));

    topologyBuilder.setBolt("CALCULATE_THE_METRICS",new CalculateTheMetrics(),64).fieldsGrouping("SEARCH_FOR_THE_AUTHORS_TWEET_DATA",new Fields("USERNAME"));

    Config config=new Config();
    if(args!=null && args.length>0){
        config.setNumWorkers(10);
        config.setNumAckers(5);
        config.setMaxSpoutPending(100);
        StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
    }else{
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("Test",config,topologyBuilder.createTopology());
        Utils.sleep(1*60*60*1000);
        localCluster.killTopology("Test");
        localCluster.shutdown();
    }
}

}
我想要的是,让它停止重复阅读同一条微博和计算同一条微博。请帮助

hc2pp10m

hc2pp10m1#

像这样的?

public class Calculate1Metric extends BaseRichBolt {
private OutputCollector collector;
Map<String ,Integer>OT1;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("username","OT1"));

}

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector=collector;
    this.OT1=new HashMap<String, Integer>();

    }

@Override
public void execute(Tuple input) {
    final String sourceComponent = input.getSourceComponent();

        String author = input.getString(0);
        String tweet = input.getString(2);

        if (author != null && tweet != null) {

            Integer OT1 = this.OT1.get(author);
            if (OT1 == null) {
                OT1 = 0;
            }

            if (!tweet.startsWith("@") || !tweet.contains("RT ") || !tweet.startsWith("RT")) {
                OT1 += 1;
            }

            if(!this.OT1.containsKey(author)) {
                this.OT1.put(author, OT1);

            }else{
                collector.emit(new Values(author,OT1,OT2));
                System.out.println(author + " " + OT1+" "+OT2);

                this.OT1.remove(author);

            }
        }else{
            collector.fail(input);

        }
      collector.ack(input);
 }

相关问题