我正在尝试创建一个拓扑has:1 spout 它发出tweet和两个螺栓:一个收集tweet的tweetparserbolt和一个收集tweeter用户名的userparserbolt。
假设我已经创建了第三个锚定tweetparserbolt和userparserbolt的bolt,这样就可以将tweeter的用户名Map到她/他已经发布的tweets列表中。我遇到的问题是bolt返回一个空的tweets列表。
有人能帮我理解代码有什么问题吗
下面是拓扑和三个螺栓的代码:
public class TwitterTopology {
private static String consumerKey = "*********************";
private static String consumerSecret = "*****************";
private static String accessToken = "********************";
private static String accessTokenSecret = "****************";
public static void main(String [] args) throws Exception{
/***SETUP***/
String remoteClusterTopologyName = null;
if (args!=null) {
if (args.length==1) {
remoteClusterTopologyName = args[0];
}
// If credentials are provided as commandline arguments
else if (args.length==4) {
accessToken =args[0];
accessTokenSecret =args[1];
consumerKey =args[2];
consumerSecret =args[3];
}
}
/********************************/
TopologyBuilder builder = new TopologyBuilder();
FilterQuery filterQuery = new FilterQuery();
filterQuery.track(new String[]{"#cloudcomputing"});
filterQuery.language(new String[]{"en"});
TwitterSpout spout = new TwitterSpout( accessToken, accessTokenSecret,consumerKey, consumerSecret, filterQuery);
builder.setSpout("TwitterSpout",spout,1);
builder.setBolt("TweetParserBolt",new TweetParserBolt(),4).shuffleGrouping("TwitterSpout");
builder.setBolt("UserMapperBolt",new UserParserBolt()).shuffleGrouping("TwitterSpout");
UserAndTweetsMapperBolt()).fieldsGrouping(("TweetParserBolt"), new Fields("username","tweet","bolt"))
.fieldsGrouping(("UserMapperBolt"),new Fields("username","tweet","bolt"));
Config conf = new Config();
conf.setDebug(true);
if (remoteClusterTopologyName!=null) {
conf.setNumWorkers(4);
StormSubmitter.submitTopology(remoteClusterTopologyName, conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Thread.sleep(460000);
cluster.shutdown();
}
}
public class TweetParserBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("username","tweet","bolt"));
}
@Override
public void prepare(Map map,TopologyContext context,OutputCollector collector){
this.collector=collector;
}
@Override
public void execute(Tuple tuple){
Status tweet=(Status)tuple.getValue(0);
String username=tweet.getUser().getScreenName();
collector.emit(tuple,new Values(username,tweet,"tweet_parser_bolt"));
}
}
public class UserParserBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("username","tweet"));
}
@Override
public void prepare(Map map,TopologyContext context,OutputCollector collector){
this.collector=collector;
}
@Override
public void execute(Tuple tuple){
Status tweet=(Status)tuple.getValue(0);
String username=tweet.getUser().getScreenName();
collector.emit(tuple,new Values(username,tweet,"user_parser_bolt"));
}
}
public class UserAndTweetsMapperBolt extends BaseRichBolt {
private OutputCollector collector;
List<Tuple>listOfTuples;
Map<String,Status>tempTweetsMap;
Map<String,List<Status>>UserAndTweetsMap;
List<Status>tweets;
List<String>tempUsers;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("username","tweets"));
}
@Override
public void prepare(Map map,TopologyContext context,OutputCollector collector){
this.collector=collector;
this.listOfTuples=new ArrayList<Tuple>();
this.tempTweetsMap=new HashMap<String, Status>();
this.UserAndTweetsMap=new HashMap<String, List<Status>>();
this.tempUsers=new ArrayList<String>();
this.tweets=new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple){
//String username=tuple.getStringByField("username");
//Status status=(Status)tuple.getValueByField("tweet");
String username=tuple.getValue(0).toString();
String sourceComponent=tuple.getSourceComponent();
if(sourceComponent.equals("TwitterParserBolt")){
String tempUser1=tuple.getValue(0).toString();
Status tempStatus1=(Status)tuple.getValue(1);
tempTweetsMap.put(tempUser1,tempStatus1);
}else if(sourceComponent.equals("UserParserBolt")){
String tempUser2=tuple.getValue(0).toString();
Status tempStatus2=(Status)tuple.getValue(1);
tempUsers.add(tempUser2);
}
for(int i=0;i<tempUsers.size();i++){
for(int j=0;j<tempTweetsMap.size();j++){
if(tempUsers.get(i).equals(tempTweetsMap.get(j).getUser().getScreenName())){
tweets.add(tempTweetsMap.get(j));
}
}
}
collector.emit(new Values(username,tweets));
}
}
1条答案
按热度按时间v2g6jxz61#
您需要对组合它们的螺栓中的用户名进行字段分组。如果你像现在这样按所有字段分组,那么在同一个任务中,你可能会也可能不会得到同一个用户的所有tweet。此外,您的Map将只捕获任何给定用户的最后状态。如果您想要它们全部,您需要使值成为一个状态数组。