reduce失败,原因是任务尝试报告状态失败达600秒谋杀!解决方案?

hi3rlvi2  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(260)

作业的reduce阶段失败,原因是:

个失败的reduce任务超出了允许的限制。

每个任务失败的原因是:
任务尝试\u 201301251556 \u 1637 \u r\u000005 \u 0报告状态失败600秒。谋杀!
具体问题:
map阶段接收格式为:time,rid,data的每条记录。
数据的格式为:数据元素及其计数。
例:a、1B、4C、7与一条记录的数据相关。
Map器为每个数据元素输出每个记录的数据。如:
key:(time,a,),val:(rid,data)key:(time,b,),val:(rid,data)key:(time,c,),val:(rid,data)
每个reduce从所有记录中接收同一密钥对应的所有数据。e、 gemoji objects:key(time,a),val:(rid1,data)和key:(time,a),val:(rid2,data)到达相同的reduce示例。
它在这里进行一些处理并输出类似的rid。
我的程序在一个小数据集(如10mb)上运行正常。但当数据增加到1g时失败,原因如上所述。我不知道为什么会这样。请帮帮我!
减少代码:
以下有两类:
VCLReduce0Split CoreSplit 答。 VCLReduce0SPlit ```
public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
// @SuppressWarnings("unchecked")
public void reduce (Text key, Iterator values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

        String key_str = key.toString();
        StringTokenizer stk = new StringTokenizer(key_str);
        String t = stk.nextToken();

        HashMap<String, String> hmap = new HashMap<String, String>();

        while(values.hasNext())
        {
            StringBuffer sbuf1 = new StringBuffer(); 
            String val = values.next().toString();
            StringTokenizer st = new StringTokenizer(val);

            String uid = st.nextToken();

            String data = st.nextToken();

                 int total_size = 0;

                 StringTokenizer stx = new StringTokenizer(data,"|");

                 StringBuffer sbuf = new StringBuffer();

                 while(stx.hasMoreTokens())
                 {
                     String data_part = stx.nextToken();
                     String data_freq = stx.nextToken();

                //   System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq);
                     sbuf.append(data_part);
                     sbuf.append("|");
                     sbuf.append(data_freq);
                     sbuf.append("|");
                 }
            /*     
                 for(int i = 0; i<parts.length-1; i++)
                 {
                     System.out.println("data:--------------->"+data);
                     int part_size = Integer.parseInt(parts[i+1]);
                     sbuf.append(parts[i]);
                     sbuf.append("|");
                     sbuf.append(part_size);
                     sbuf.append("|");
                     total_size = part_size+total_size;
                     i++;
                 }*/

            sbuf1.append(String.valueOf(total_size));
            sbuf1.append(",");
            sbuf1.append(sbuf);
            if(uid.equals("203664471")){
            //  System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf);
            }
            hmap.put(uid, sbuf1.toString());

        }

        float threshold = (float)0.8;

        CoreSplit obj = new CoreSplit();

        ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold);

        for(int i = 0; i<al.size(); i++)
        {
            CustomMapSimilarity cmaps = al.get(i);
            String xy_pair = cmaps.getRIDPair();
            String similarity = cmaps.getSimilarity();
            output.collect(new Text(xy_pair), new Text(similarity));
        }

     }
}
b。 `coreSplit` ```
package com.a;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.commons.collections.map.MultiValueMap;

public class PPJoinPlusCoreOptNewSplit{

     public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t)
     {

         ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>();
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap.keySet().iterator();

        MultiValueMap index = new MultiValueMap();

        String RID;
        TreeMap<String, Integer> hmap2;
        Iterator<String> iter1;

        int size;
        float prefix_size;
        HashMap<String, Float> alpha;
        HashMap<String, CustomMapOverlap> hmap_overlap;

        String data;

        while(iter.hasNext())
            {
                RID = (String)iter.next();

                String data_val = hmap.get(RID);

                StringTokenizer st = new StringTokenizer(data_val,",");
            //    System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time);
                String RIDsize = st.nextToken();
                size = Integer.parseInt(RIDsize);
                data = st.nextToken();

                StringTokenizer st1 = new StringTokenizer(data,"\\|");

                String[] parts = data.split("\\|");

            //  hmap2 = (TreeMap<String, Integer>)hmap.get(RID);
        //      iter1 = hmap2.keySet().iterator();

            //  size = hmap_size.get(RID);

                prefix_size = (float)(size-(0.8*size)+1); 

                if(size==1)
                {
                    prefix_size = 1;
                }

                alpha = new HashMap<String, Float>();

                hmap_overlap = new HashMap<String, CustomMapOverlap>();

        //      Iterator<String> iter2 = hmap2.keySet().iterator();

                int prefix_index = 0;

                int pi=0;

                for(float j = 0; j<=prefix_size; j++)
                {

                    boolean prefix_chk = false;
                    prefix_index++;
                    String ptoken = parts[pi];
            //      System.out.println("data:---->"+data+" ptoken:---->"+ptoken);
                    float val = Float.parseFloat(parts[pi+1]);
                    float temp_j = j;
                     j = j+val;
                     boolean j_l = false ;
                     float prefix_contri = 0;
                     pi= pi+2;

                     if(j>prefix_size)
                        {

                            // prefix_contri = j-temp_j;
                             prefix_contri = prefix_size-temp_j;

                            if(prefix_contri>0)
                            {
                                 j_l = true;
                                 prefix_chk = false;

                            }
                            else
                            {
                                prefix_chk = true;                              
                            }
                        }                   

                    if(prefix_chk == false){

                        filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri);

                    CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j);
                    index.put(ptoken, cmapt);

                }

            }

                als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap);

                for(int i = 0; i<als.size(); i++)
                {
                    if(als.get(i).getRIDPair()!=null)
                    {
                        alsim.add(als.get(i));

                    }
                }

            }

         return alsim;

     }

     public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri)
     {
            @SuppressWarnings("unchecked")

            ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken);

            if((positions_list!=null) &&(positions_list.size()!=0))
            {

                CustomMapPrefixTokens cmapt ;
                String y;
                Iterator<String> iter3;
                int y_size = 0;
                float check_size = 0;
            //  TreeMap<String, Integer> hmapy;
                float RID_val=0;
                float y_overlap = 0;
                float ubound = 0;
                ArrayList<Float> fl = new ArrayList<Float>();

              StringTokenizer st;

            for(int k = 0; k<positions_list.size(); k++)
            {
                cmapt = positions_list.get(k);

                if(!cmapt.getRID().equals(RID))
                {

                 y = hmap.get(cmapt.getRID());

                // iter3 = y.keySet().iterator();

                 String yRID = cmapt.getRID();

                 st = new StringTokenizer(y,",");

                 y_size = Integer.parseInt(st.nextToken());

                 check_size = (float)0.8*(size);

                if(y_size>=check_size)
                {

                    //hmapy = hmap.get(yRID);

                    String y_data = st.nextToken();

                    StringTokenizer st1 = new StringTokenizer(y_data,"\\|");

                    while(st1.hasMoreTokens())
                    {
                        String token = st1.nextToken();
                        if(token.equals(ptoken))
                        {

                            String nxt_token = st1.nextToken();
                    //      System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token);
                            RID_val = (float)Integer.parseInt(nxt_token);
                            break;
                        }
                    }

                 //    RID_val = (float) hmapy.get(ptoken); 
                     float alpha1 = (float)(0.8/1.8)*(size+y_size);

                     fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri);

                     ubound = fl.get(0);
                     y_overlap = fl.get(1);

                    positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap);

                  }

                }   
            }
        }

     }

   public void positionFilter( float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap)
   {

     float y_overlap_total = 0;

            if(null!=hmap_overlap.get(cmapt.getRID()))
            {

            y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap();

            if((y_overlap_total+ubound)>=alpha1)
            {

                CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID());

                float y_o_t = y_overlap+y_overlap_total;

                cmap_tmp.setOverlap(y_o_t);
                hmap_overlap.put(cmapt.getRID(),cmap_tmp);

            }
            else
            {
                float n = 0;
                hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n));
            }

            }
            else
            {
                CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap);
                hmap_overlap.put(cmapt.getRID(), cmap_tmp);

            }

   }

   public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri )
   {

            alpha.put(cmapt.getRID(), alpha1);
            float min1 = y_size-cmapt.getPosition();
            float min2 = size-j;
            float min = 0;

            float y_overlap = 0;

            if(min1<min2)
            {
                min = min1;
            }
            else
            {
                min = min2;
            }
            if(j_l==true)
            {
                val = prefix_contri;    
            }                                       
            if(RID_val<val)
            {
                y_overlap = RID_val;
            }
            else
            {
                y_overlap = val;
            }

            float ubound = y_overlap+min;

            ArrayList<Float> fl = new ArrayList<Float>();
            fl.add(ubound);
            fl.add(y_overlap);

            return fl;

   }

     public ArrayList<CustomMapSimilarity> calcSimilarity( String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap)
     {

         float jaccard = 0;

         CustomMapSimilarity cms = new CustomMapSimilarity(null, null);   
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap_overlap.keySet().iterator();

        while(iter.hasNext())
        {
            String key = (String)iter.next();

            CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key);

            float overlap = (float)val.getOverlap();

            if(overlap>0)
            {

               String yRID = val.getRID();

              String RIDpair = RID+" "+yRID;

             jaccard = unionIntersection(hmap, RIDpair);

             if(jaccard>0.8)
                {
                    cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard));
                    alsim.add(cms);
                }

            }

        }

         return alsim;

     }

     public float unionIntersection( HashMap<String,String> hmap, String RIDpair)
     {

            StringTokenizer st = new StringTokenizer(RIDpair);

            String xRID = st.nextToken();

            String yRID = st.nextToken();

            String xdata = hmap.get(xRID);

            String ydata = hmap.get(yRID);

            int total_union = 0;

            int xval = 0;
            int yval = 0;
            int part_union = 0;

            int total_intersect = 0;

        //  System.out.println("xdata:------*************>"+xdata);

            StringTokenizer xtokenizer = new StringTokenizer(xdata,",");
            StringTokenizer ytokenizer = new StringTokenizer(ydata,",");
        //  String[] xpart = xdata.split(",");
        //  String[] ypart = ydata.split(",");

            xtokenizer.nextToken();
            ytokenizer.nextToken();

            String datax = xtokenizer.nextToken();
            String datay = ytokenizer.nextToken();

            HashMap<String,Integer> x = new HashMap<String, Integer>();
            HashMap<String,Integer> y = new HashMap<String, Integer>();

            String [] xparts;

                 xparts = datax.toString().split("\\|");

              String [] yparts;

                 yparts = datay.toString().split("\\|");

                 for(int i = 0; i<xparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(xparts[i+1]);
                     x.put(xparts[i], part_size);

                     i++;
                 }

                 for(int i = 0; i<yparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(yparts[i+1]);
                     y.put(xparts[i], part_size);

                     i++;
                 }

             Set<String> xset = x.keySet();
             Set<String> yset = y.keySet();

            for(String elm:xset )
            {

                yval = 0;

                xval = (Integer)x.get(elm);

                part_union = 0;
                int part_intersect = 0;
                if(yset.contains(elm)){

                    yval = (Integer) y.get(elm);

                if(xval>yval)
                {
                    part_union = xval;
                    part_intersect = yval;
                }
                else
                {
                    part_union = yval;
                    part_intersect = xval;
                }
                total_intersect = total_intersect+part_intersect;
                }
                else
                {
                    part_union = xval;
                }

                total_union = total_union+part_union;

            }

            for(String elm: yset)
            {
                part_union = 0;

                if(!xset.contains(elm))
                {
                    part_union = (Integer) y.get(elm);
                    total_union = total_union+part_union;
                }

            }

            float jaccard = (float)total_intersect/total_union;

         return jaccard;

     }

}
tzdcorbm

tzdcorbm1#

您可能已经消耗了java的所有堆空间,或者gc发生得太频繁,没有机会让reducer向master报告状态,因此被终止。
另一种可能是其中一个reducer获取的数据太倾斜,即对于特定的rid,有很多记录。
尝试通过设置以下配置来增加java堆: mapred.child.java.opts-Xmx2048m 另外,通过将下面的配置设置为低于当前值的值(默认值为0),尝试减少并行减速器的数量 2 ): mapred.tasktracker.reduce.tasks.maximum

jxct1oxe

jxct1oxe2#

超时的原因可能是reducer中的一个长时间运行的计算没有向hadoop框架报告进度。这可以通过不同的方法来解决:
一。增加中的超时 mapred-site.xml :

<property>
  <name>mapred.task.timeout</name>
  <value>1200000</value>
</property>

默认值为 600000 ms = 600 seconds .
二。每x个记录报告一次进度,如javadoc中的reducer示例所示:

public void reduce(K key, Iterator<V> values,
                          OutputCollector<K, V> output, 
                          Reporter reporter) throws IOException {
   // report progress
   if ((noValues%10) == 0) {
     reporter.progress();
   }

   // ...
}

可选地,您可以增加自定义计数器,如示例所示:

reporter.incrCounter(NUM_RECORDS, 1);

相关问题