我试图通过使用下面的嵌套循环(见下文)简单地查找匹配项来执行连接。该作业在较小的数据集上运行良好,但在较大的数据集上会卡住。它通过减速器,直到达到99%或98%,然后就挂起了。我不知道这是否是内存问题,或者当它超过一定数量的记录时,reducer无法处理for循环计算。我想指出的是,如果省略代码中的for循环部分,那么在更大的数据集上工作就完成了。
public void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String kjoin=key.toString();
String result="";
List<String[]> tGrad = new ArrayList<String[]>();
List<String[]> tUni = new ArrayList<String[]>();
List<String[]> tDep = new ArrayList<String[]>();
List<String[]> memOf = new ArrayList<String[]>();
List<String[]> subOrg = new ArrayList<String[]>();
List<String[]> ungrad = new ArrayList<String[]>();
String line="";
String source="";
int i=0;
for (Text value : values){
line=value.toString();
String[] parts=new String[2];
source=line.substring(0,line.indexOf(","));
// everything before the first comma.
parts[0]=line.substring(line.indexOf(",")+1,line.lastIndexOf(","));
// between first comma and last.
parts[1]=line.substring(line.lastIndexOf(",")+1);
// after last comma
//separate components
if (source.equals("tGrad")){
tGrad.add(parts);
} else if (source.equals("tUni")) {
tUni.add(parts);
} else if (source.equals("tDep")){
tDep.add(parts);
}else if (source.equals("memOf")){
memOf.add(parts);
} else if (source.equals("subOrg")) {
subOrg.add(parts);
} else if (source.equals("ungrad")){
ungrad.add(parts);
}//end if/else
source=null;
line=null;
} // end for loop for iteration over values.
//join tuples
for (String[] so: subOrg){
for (String[] mo: memOf){
if (so[0].equals(mo[1])){
for (String[] ug: ungrad){
if (so[1].equals(ug[1]) && mo[0].equals(ug[0])){
for (String[] tu: tUni){
if (ug[1].equals(tu[0]) ){
for (String[] td: tDep){
if (mo[1].equals(td[0])){
for (String[] tg: tGrad){
if (mo[0].equals(tg[0])){
result="f("+td[0]+","+mo[0]+","+mo[1]+","+so[0]+","+so[1]+","+tu[0]+","+ug[0]+ug[1]+tg[0]+","+")";
context.write(NullWritable.get(),new Text(result)); /
} //end 1st if
} // end 2nd if
} // end for
} // end for
} //end for
}
}
}
}}
}//end method
}//end class
1条答案
按热度按时间mrzz3bfm1#
这不是由于reducer的内存或限制,而是由于您执行计算的方式。如果您使用map reduce定义联接计算,而不是在单个reducer中执行所有计算,这将是很好的。如果您在独立代码中运行相同的计算,您将看到相同的问题。请通过此链接了解如何在mapreduce中执行联接