用storm将用户名Map到tweets

uxh89sit  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(296)

我正在尝试创建一个拓扑has:1 spout 它发出tweet和两个螺栓:一个收集tweet的tweetparserbolt和一个收集tweeter用户名的userparserbolt。
假设我已经创建了第三个锚定tweetparserbolt和userparserbolt的bolt,这样就可以将tweeter的用户名Map到她/他已经发布的tweets列表中。我遇到的问题是bolt返回一个空的tweets列表。
有人能帮我理解代码有什么问题吗
下面是拓扑和三个螺栓的代码:

public class TwitterTopology {
private static String consumerKey = "*********************";
private static String consumerSecret = "*****************";
private static String accessToken = "********************";
private static String accessTokenSecret = "****************";

public static void main(String [] args) throws Exception{

    /***SETUP***/

    String remoteClusterTopologyName = null;
    if (args!=null) {
        if (args.length==1) {
            remoteClusterTopologyName = args[0];
        }
        // If credentials are provided as commandline arguments
        else if (args.length==4) {
            accessToken =args[0];
            accessTokenSecret =args[1];
            consumerKey =args[2];
            consumerSecret =args[3];
        }

    }
    /********************************/

    TopologyBuilder builder = new TopologyBuilder();

    FilterQuery filterQuery = new FilterQuery();
    filterQuery.track(new String[]{"#cloudcomputing"});
    filterQuery.language(new String[]{"en"});

    TwitterSpout spout = new TwitterSpout( accessToken, accessTokenSecret,consumerKey, consumerSecret, filterQuery);
    builder.setSpout("TwitterSpout",spout,1);

    builder.setBolt("TweetParserBolt",new TweetParserBolt(),4).shuffleGrouping("TwitterSpout");
    builder.setBolt("UserMapperBolt",new UserParserBolt()).shuffleGrouping("TwitterSpout");

    UserAndTweetsMapperBolt()).fieldsGrouping(("TweetParserBolt"), new Fields("username","tweet","bolt"))
                                                                            .fieldsGrouping(("UserMapperBolt"),new Fields("username","tweet","bolt"));

    Config conf = new Config();
    conf.setDebug(true);

    if (remoteClusterTopologyName!=null) {
        conf.setNumWorkers(4);

        StormSubmitter.submitTopology(remoteClusterTopologyName, conf, builder.createTopology());
    }
    else {
        conf.setMaxTaskParallelism(3);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());

        Thread.sleep(460000);

        cluster.shutdown();
    }

}

public class TweetParserBolt extends BaseRichBolt {
   private OutputCollector collector;

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer){
      declarer.declare(new Fields("username","tweet","bolt"));
   }

   @Override
   public void prepare(Map map,TopologyContext context,OutputCollector collector){
      this.collector=collector;
   }

   @Override
   public void execute(Tuple tuple){
      Status tweet=(Status)tuple.getValue(0);
      String username=tweet.getUser().getScreenName();

      collector.emit(tuple,new Values(username,tweet,"tweet_parser_bolt"));

    }
  }

   public class UserParserBolt extends BaseRichBolt{
     private OutputCollector collector;

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("username","tweet"));
     }

     @Override
     public void prepare(Map map,TopologyContext context,OutputCollector collector){
        this.collector=collector;
     }

     @Override
     public void execute(Tuple tuple){
       Status tweet=(Status)tuple.getValue(0);
       String username=tweet.getUser().getScreenName();

       collector.emit(tuple,new Values(username,tweet,"user_parser_bolt"));

    }
 }

 public class UserAndTweetsMapperBolt extends  BaseRichBolt {
    private OutputCollector collector;

    List<Tuple>listOfTuples;

    Map<String,Status>tempTweetsMap;
    Map<String,List<Status>>UserAndTweetsMap;
    List<Status>tweets;
    List<String>tempUsers;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){
       declarer.declare(new Fields("username","tweets"));
    }

    @Override
    public void prepare(Map map,TopologyContext context,OutputCollector collector){
      this.collector=collector;
      this.listOfTuples=new ArrayList<Tuple>();

      this.tempTweetsMap=new HashMap<String, Status>();
      this.UserAndTweetsMap=new HashMap<String, List<Status>>();

      this.tempUsers=new ArrayList<String>();
      this.tweets=new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple){
       //String username=tuple.getStringByField("username");
       //Status status=(Status)tuple.getValueByField("tweet");

       String username=tuple.getValue(0).toString();

       String sourceComponent=tuple.getSourceComponent();

       if(sourceComponent.equals("TwitterParserBolt")){
          String tempUser1=tuple.getValue(0).toString();
          Status tempStatus1=(Status)tuple.getValue(1);

          tempTweetsMap.put(tempUser1,tempStatus1);

       }else if(sourceComponent.equals("UserParserBolt")){
          String tempUser2=tuple.getValue(0).toString();
          Status tempStatus2=(Status)tuple.getValue(1);

          tempUsers.add(tempUser2);
      }

      for(int i=0;i<tempUsers.size();i++){
         for(int j=0;j<tempTweetsMap.size();j++){
             if(tempUsers.get(i).equals(tempTweetsMap.get(j).getUser().getScreenName())){
                 tweets.add(tempTweetsMap.get(j));
             }
         }
     }

        collector.emit(new Values(username,tweets));
   }
 }
v2g6jxz6

v2g6jxz61#

您需要对组合它们的螺栓中的用户名进行字段分组。如果你像现在这样按所有字段分组,那么在同一个任务中,你可能会也可能不会得到同一个用户的所有tweet。此外,您的Map将只捕获任何给定用户的最后状态。如果您想要它们全部,您需要使值成为一个状态数组。

UserAndTweetsMapperBolt().
    fieldsGrouping(("TweetParserBolt"), new Fields("username")).
    fieldsGrouping(("UserMapperBolt"),new Fields("username"));

相关问题