hadoop reducer无法处理嵌套循环计算?

7jmck4yq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(367)

我试图通过使用下面的嵌套循环(见下文)简单地查找匹配项来执行连接。该作业在较小的数据集上运行良好,但在较大的数据集上会卡住。它通过减速器,直到达到99%或98%,然后就挂起了。我不知道这是否是内存问题,或者当它超过一定数量的记录时,reducer无法处理for循环计算。我想指出的是,如果省略代码中的for循环部分,那么在更大的数据集上工作就完成了。

public void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String kjoin=key.toString();
String result="";
List<String[]> tGrad = new ArrayList<String[]>();
List<String[]> tUni = new ArrayList<String[]>();
List<String[]> tDep = new ArrayList<String[]>();
List<String[]> memOf = new ArrayList<String[]>();
List<String[]> subOrg = new ArrayList<String[]>();
List<String[]> ungrad = new ArrayList<String[]>();
String line="";
String source="";
int i=0;

for (Text value : values){
line=value.toString();
String[] parts=new String[2];

source=line.substring(0,line.indexOf(","));
// everything before the first comma. 
parts[0]=line.substring(line.indexOf(",")+1,line.lastIndexOf(","));
// between first comma and last.
parts[1]=line.substring(line.lastIndexOf(",")+1);
// after last comma 

//separate components 

    if (source.equals("tGrad")){
        tGrad.add(parts);

        }  else if (source.equals("tUni")) {
            tUni.add(parts);

            } else if (source.equals("tDep")){
                tDep.add(parts);

                    }else if (source.equals("memOf")){
                        memOf.add(parts);

                        }  else if (source.equals("subOrg")) {
                                subOrg.add(parts);

                                } else if (source.equals("ungrad")){
                                    ungrad.add(parts);

                    }//end if/else  

source=null;
line=null;
} // end for loop for iteration over values.

//join tuples
 for (String[] so: subOrg){ 

    for (String[] mo: memOf){ 
    if (so[0].equals(mo[1])){ 
        for (String[] ug: ungrad){
        if (so[1].equals(ug[1]) && mo[0].equals(ug[0])){  
            for (String[] tu: tUni){
                if (ug[1].equals(tu[0]) ){
                for (String[] td: tDep){
                    if (mo[1].equals(td[0])){
                    for (String[] tg: tGrad){
                    if (mo[0].equals(tg[0])){

    result="f("+td[0]+","+mo[0]+","+mo[1]+","+so[0]+","+so[1]+","+tu[0]+","+ug[0]+ug[1]+tg[0]+","+")";
    context.write(NullWritable.get(),new Text(result)); /
                    } //end 1st if
                } // end 2nd if

            } // end for
        } // end for 
    } //end for  

    }
}
}
  }}    
    }//end method   
 }//end class
mrzz3bfm

mrzz3bfm1#

这不是由于reducer的内存或限制,而是由于您执行计算的方式。如果您使用map reduce定义联接计算,而不是在单个reducer中执行所有计算,这将是很好的。如果您在独立代码中运行相同的计算,您将看到相同的问题。请通过此链接了解如何在mapreduce中执行联接

相关问题