mapreduce作业-完成时间过长

lstz6jyr  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(394)

我们已经编写了一个mapreduce作业来处理日志文件。到目前为止,我们有大约52gb的输入文件,但处理数据需要大约一个小时。默认情况下,它只创建一个reducer作业。通常我们会在reduce任务中看到超时错误,然后它会重新启动并完成。以下是成功完成工作的统计数据。请告诉我们如何提高性能。

File System Counters
            FILE: Number of bytes read=876100387
            FILE: Number of bytes written=1767603407
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=52222279591
            HDFS: Number of bytes written=707429882
            HDFS: Number of read operations=351
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=2
    Job Counters 
            Failed reduce tasks=1
            Launched map tasks=116
            Launched reduce tasks=2
            Other local map tasks=116
            Total time spent by all maps in occupied slots (ms)=9118125
            Total time spent by all reduces in occupied slots (ms)=7083783
            Total time spent by all map tasks (ms)=3039375
            Total time spent by all reduce tasks (ms)=2361261
            Total vcore-seconds taken by all map tasks=3039375
            Total vcore-seconds taken by all reduce tasks=2361261
            Total megabyte-seconds taken by all map tasks=25676640000
            Total megabyte-seconds taken by all reduce tasks=20552415744
    Map-Reduce Framework
            Map input records=49452982
            Map output records=5730971
            Map output bytes=864140911
            Map output materialized bytes=876101077
            Input split bytes=13922
            Combine input records=0
            Combine output records=0
            Reduce input groups=1082133
            Reduce shuffle bytes=876101077
            Reduce input records=5730971
            Reduce output records=5730971
            Spilled Records=11461942
            Shuffled Maps =116
            Failed Shuffles=0
            Merged Map outputs=116
            GC time elapsed (ms)=190633
            CPU time spent (ms)=4536110
            Physical memory (bytes) snapshot=340458307584
            Virtual memory (bytes) snapshot=1082745069568
            Total committed heap usage (bytes)=378565820416
    Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
    File Input Format Counters 
            Bytes Read=52222265669
    File Output Format Counters 
            Bytes Written=707429882

如果我增加了减速机的数量,我会得到如下的classcast异常。我猜这个问题是来自于分区类。

java.lang.Exception: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: com.emaar.bigdata.exchg.logs.CompositeWritable cannot be cast to org.apache.hadoop.io.Text
    at com.emaar.bigdata.exchg.logs.ActualKeyPartitioner.getPartition(ActualKeyPartitioner.java:1)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:716)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:56)
    at com.emaar.bigdata.exchg.logs.ExchgLogsMapper.map(ExchgLogsMapper.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)

我的分班同学

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {

    HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
    Text newKey = new Text();

    @Override
    public int getPartition(CompositeKey key, Text value, int numReduceTasks) {

        try {
            // Execute the default partitioner over the first part of the key
            newKey.set(key.getSubject());
            return hashPartitioner.getPartition(newKey, value, numReduceTasks);
        } catch (Exception e) {
            e.printStackTrace();
            return (int) (Math.random() * numReduceTasks); // this would return
                                                            // a random value in
                                                            // the range
            // [0,numReduceTasks)
        }
    }
}

Map程序代码

public class ExchgLogsMapper extends Mapper<LongWritable, List<Text>, CompositeKey, Writable> {
    String recepientAddresses = "";
    public static final String DELIVER = "DELIVER";
    public static final String RESOLVED = "Resolved";
    public static final String JUNK = "Junk E-mail";
    public static final String SEMICOLON = ";";
    public static final String FW1 = "FW: ";
    public static final String FW2 = "Fw: ";
    public static final String FW3 = "FWD: ";
    public static final String FW4 = "Fwd: ";
    public static final String FW5 = "fwd: ";
    public static final String RE1 = "RE: ";
    public static final String RE2 = "Re: ";
    public static final String RE3 = "re: ";

    Text mailType = new Text("NEW");
    Text fwType = new Text("FW");
    Text reType = new Text("RE");
    Text recepientAddr = new Text();

    @Override
    public void map(LongWritable key, List<Text> values, Context context) throws IOException, InterruptedException {
        String subj = null;
        int lstSize=values.size() ;
        if ((lstSize >= 26)) {
            if (values.get(8).toString().equals(DELIVER)) {
                if (!(ExclusionList.exclusions.contains(values.get(18).toString()))) {
                    if (!(JUNK.equals((values.get(12).toString())))) {
                        subj = values.get(17).toString();
                        recepientAddresses = values.get(11).toString();
                        String[] recepientAddressArr = recepientAddresses.split(SEMICOLON);
                        if (subj.startsWith(FW1) || subj.startsWith(FW2) || subj.startsWith(FW3)
                                || subj.startsWith(FW4) || subj.startsWith(FW5)) {
                            mailType = fwType;
                            subj = subj.substring(4);
                        } else if (subj.startsWith(RE1) || subj.startsWith(RE2) || subj.startsWith(RE3)) {
                            mailType = reType;
                            subj = subj.substring(4);
                        }
                        for (int i = 0; i < recepientAddressArr.length; i++) {
                            CompositeKey ckey = new CompositeKey(subj, values.get(0).toString());
                            recepientAddr.set(recepientAddressArr[i]);
                            CompositeWritable out = new CompositeWritable(mailType, recepientAddr, values.get(18),
                                    values.get(0));
                            context.write(ckey, out);
//                          System.err.println(out);

                        }
                    }
                }
            }
        }
yh2wf1be

yh2wf1be1#

在循环中的reducer代码中几乎没有sysout,它正在编写大量日志,在删除它们之后,reducer将在几分钟内完成。!

相关问题