我是hadoop的初学者,为了学习,我开始在两个表上进行外部连接。一张表有电影的细节,另一张表有收视率。
电影表的示例数据
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
评级样本数据
userId,movieId,rating,timestamp
1,122,2.0,945544824
1,172,1.0,945544871
1,1221,5.0,945544788
1,1441,4.0,945544871
1,1609,3.0,945544824
1,1961,3.0,945544871
1,1972,1.0,945544871
2,441,2.0,1008942733
2,494,2.0,1008942733
2,1193,4.0,1008942667
2,1597,3.0,1008942773
2,1608,3.0,1008942733
2,1641,4.0,1008942733
movieid是movies表中的主键和ratings表中的外键。所以在mapper类中使用movieid作为键。我使用了两个Map器,一个用于movieid表,另一个用于ratings表。
我写的代码
public class Join {
public static class MovMapper
extends Mapper<Object, Text, Text, Text>{
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] arr= value.toString().split(",");
word.set(arr[0]);
//System.out.println(word.toString()+ " mov");
context.write(word, value);
}
}
public static class RatMapper
extends Mapper<Object, Text, Text, Text>{
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] arr= value.toString().split(",");
word.set(arr[1]);
//System.out.println(word.toString() + " rat");
context.write(word, value);
}
}
public static class JoinReducer
extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
List <Text> rat=new ArrayList<Text>();
Text mov= null;
System.out.println("#######################################################################################");
for(Text item:values){
if(item.toString().split(",").length == 3){
mov= new Text(item);
}
else
rat.add(new Text(item));
System.out.println("---->" + item);
}
System.out.println("item cnt: "+rat.size()+" mov"+mov+" key"+key+" byte: "+key.getBytes().toString());
for(Text item:rat){
if(mov != null) {
context.write(item,mov);
}
}
System.out.println("#######################################################################################");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "join");
job.setJarByClass(Join.class);
job.setCombinerClass(JoinReducer.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class,MovMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class,RatMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
问题
Map时,来自movies表和ratings表的记录被Map到不同的任务,尽管movieid是相同的。令人惊讶的是,当我将movieid转换为intwritable时,两个表中与键匹配的记录被Map到同一个任务。
暂无答案!
目前还没有任何答案,快来回答吧!