为什么减速机在此代码之后停止工作?

vwhgwdsa  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(326)

下面是流行的hadoop教科书中提供的一个简单示例。代码如下follows:- [节目结束后我会解释我的问题]

package src.main.myjob;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Set;

import javax.lang.model.SourceVersion;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool
{
    public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text>
    {

        @Override
        public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
        {
        output.collect(value, key); 
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
    {
            public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
            {
                String csv = "";
                while(values.hasNext())
                {
        csv += values.next().toString();
                        csv += ",";
                }
                output.collect(key, new Text(csv));
            }
    }

    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        JobConf job = new JobConf(conf, MyJob.class);

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("MyJob");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.set("key.value.seperator.in.input.line", ",");

        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        int res = ToolRunner.run(new Configuration(), new MyJob(), args);

        System.exit(res);
    }

}

现在为了读者的利益,这个程序处理一个包含引用专利id和引用专利id行的文件[大文件],并反转它。所以在本质上,输出文件列出了所有的专利ID,每个ID后面都有一个逗号分隔的引用专利ID列表。
当我尝试执行它时,map任务运行得非常好,但是reduce任务却停留在66%。我尝试了各种各样的方法,比如检查日志(非常神秘&几乎没有用)和增加还原器的数量。然而,我很少成功地理解这个问题。代码直接指向我。我将非常感谢您的意见,帮助我了解如何调试这个问题在这里。我看这个程序没有什么明显的问题。

6za6bjd0

6za6bjd01#

作为这个问题的补充,我终于成功地完成了我的mapreduce程序。我不得不在mapred-site.xml中将hadoop的内存分配增加到6gig&正如jthrocker在上面的文章中所建议的那样使用stringbuilder。它确实完成了,但是输出文件不可读。ubuntu指出它的大小是258mb,比我的输入文件小一些。我想知道这是我的代码本身的缺陷吗?或者如果数据没有正确地以编辑友好的格式写入,从而导致解释问题?我们非常感谢您对此的所有意见。

svujldwt

svujldwt2#

首先,请考虑使用 StringBuilder 如果要连接字符串,请在reducer中添加。如果数据量很大,这种循环的性能会更好。

StringBuilder sb = new StringBuilder();
sb.append(values.next().toString());

如果要处理的数据非常庞大,那么请确保减速机中有足够的内存。

相关问题