java.io.ioexception:使用combiber的mapreduce中溢出失败

e37o9pze  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(384)

我在用hadoop mapreduce。在没有本地聚合的情况下运行项目时,即组合器类,它运行时不会出现问题。
当我添加combiner类时,我得到以下消息:

java.lang.Exception: java.io.IOException: Spill failed

此外,控制台将这些行作为导致错误的行:

context.write(new DoubleGramKey(decade), occurrences);

以及:

word2.write(out);

下面是stepone类和doublegramkey类的类:

public class DoubleGramKey implements WritableComparable<DoubleGramKey>{

    private Text word1;
    private Text word2;
    private IntWritable decade;

    public DoubleGramKey(Text word1, Text word2, IntWritable decade) {
        this.word1 = word1;
        this.word2 = word2;
        this.decade = decade;
    }

    public DoubleGramKey(IntWritable decade) {
        this(new Text("*"),new Text("*"),decade);
    }

    public DoubleGramKey() {
        this(new Text("*"),new Text("*"), new IntWritable(-1));
    }

    public void readFields(DataInput in) throws IOException {
        word1.readFields(in);
        word2.readFields(in);
        decade.readFields(in);
    }

    public void write(DataOutput out) throws IOException {
        word1.write(out);
        word2.write(out);
        decade.write(out);
    }

    public int compareTo(DoubleGramKey other) {
        // if this.decade == other.decade, put <decade * *> first
        // if this.w1 == other.w1, put <w1, *> before <w1,w2>
        int ret = this.decade.get() - other.decade.get();
        if (ret == 0) {
            if (this.word1.toString().equals("*") && !other.word1.toString().equals("*"))
                ret = -1;
            else if (other.word1.toString().equals("*") && !this.word1.toString().equals("*"))
                ret = 1;
            else
                ret = this.word1.toString().compareTo(other.word1.toString());
        }
        if (ret == 0) { //same decade and same w1
            if (this.word2.toString().equals("*") && !other.word2.toString().equals("*"))
                ret = -1;
            else if (other.word2.toString().equals("*") &&  !this.word2.toString().equals("*"))
                ret = 1;
            else
                ret = this.word2.toString().compareTo(other.word2.toString());
        }
        return ret;
    }

    public IntWritable getDecade() {
        return this.decade;
    }

    public Text getWord1() {
        return this.word1;
    }

    public Text getWord2() {
        return this.word2;
    }

    @Override
    public String toString() {
        return this.decade + " " + this.word1 + " " + this.word2;
    }
}
public class StepOne {

    private static class MyMapper extends Mapper<LongWritable, Text, DoubleGramKey, IntWritable> {

        public void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException {
            String[] line = val.toString().split("\t");
            String[] words = line[0].split(" ");
            String decade_str = line[1].substring(0, line[1].length() - 1);
            decade_str = new StringBuilder().append(decade_str).append("0").toString(); //  the decade. for example 1990
            IntWritable decade = new IntWritable(Integer.parseInt(decade_str));
            IntWritable occurrences = new IntWritable(Integer.parseInt(line[2]));
            if (words.length == 1) {
                context.write(new DoubleGramKey(new Text(words[0]), new Text("*"), decade), occurrences);
                context.write(new DoubleGramKey(decade), occurrences); //<* * decade, occurs>
            } else if (words.length == 2) {
                context.write(new DoubleGramKey(new Text(words[0]), new Text(words[1]), decade), occurrences);
            }
            // this is how we will cound number of words per decade
            // example *^&decade_couneter 1990 , 20
        }
    }

    public class MyCombiner extends Reducer<DoubleGramKey, IntWritable, DoubleGramKey, IntWritable> {

        @Override
        public void reduce(DoubleGramKey key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));

        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.out.println("starting EMR");

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(StepOne.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(DoubleGramKey.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);
        job.setCombinerClass(MyCombiner.class);
        job.setNumReduceTasks(1);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setPartitionerClass(StepOne.MyPartitioner.class);

        job.setInputFormatClass(TextInputFormat.class);
        SequenceFileInputFormat.addInputPath(job, new Path("/home/maor/Desktop/DSP202/ass2_202/googlebooks-eng-all-1gram-20120701-z"));
        SequenceFileInputFormat.addInputPath(job, new Path("/home/maor/Desktop/DSP202/ass2_202/googlebooks-eng-all-2gram-20120701-zy"));
        String output = "output";

        //TODO change the output for args[2] in aws run
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(output));
        job.waitForCompletion(true);
    }
}
aiazj4mn

aiazj4mn1#

嗯,溢出的问题是由命名方法异常引起的,因此,我发现我的组合器类缺少“static”这个词,这就解决了这个问题。

相关问题