我在用hadoop mapreduce。在没有本地聚合的情况下运行项目时,即组合器类,它运行时不会出现问题。
当我添加combiner类时,我得到以下消息:
java.lang.Exception: java.io.IOException: Spill failed
此外,控制台将这些行作为导致错误的行:
context.write(new DoubleGramKey(decade), occurrences);
以及:
word2.write(out);
下面是stepone类和doublegramkey类的类:
public class DoubleGramKey implements WritableComparable<DoubleGramKey>{
private Text word1;
private Text word2;
private IntWritable decade;
public DoubleGramKey(Text word1, Text word2, IntWritable decade) {
this.word1 = word1;
this.word2 = word2;
this.decade = decade;
}
public DoubleGramKey(IntWritable decade) {
this(new Text("*"),new Text("*"),decade);
}
public DoubleGramKey() {
this(new Text("*"),new Text("*"), new IntWritable(-1));
}
public void readFields(DataInput in) throws IOException {
word1.readFields(in);
word2.readFields(in);
decade.readFields(in);
}
public void write(DataOutput out) throws IOException {
word1.write(out);
word2.write(out);
decade.write(out);
}
public int compareTo(DoubleGramKey other) {
// if this.decade == other.decade, put <decade * *> first
// if this.w1 == other.w1, put <w1, *> before <w1,w2>
int ret = this.decade.get() - other.decade.get();
if (ret == 0) {
if (this.word1.toString().equals("*") && !other.word1.toString().equals("*"))
ret = -1;
else if (other.word1.toString().equals("*") && !this.word1.toString().equals("*"))
ret = 1;
else
ret = this.word1.toString().compareTo(other.word1.toString());
}
if (ret == 0) { //same decade and same w1
if (this.word2.toString().equals("*") && !other.word2.toString().equals("*"))
ret = -1;
else if (other.word2.toString().equals("*") && !this.word2.toString().equals("*"))
ret = 1;
else
ret = this.word2.toString().compareTo(other.word2.toString());
}
return ret;
}
public IntWritable getDecade() {
return this.decade;
}
public Text getWord1() {
return this.word1;
}
public Text getWord2() {
return this.word2;
}
@Override
public String toString() {
return this.decade + " " + this.word1 + " " + this.word2;
}
}
public class StepOne {
private static class MyMapper extends Mapper<LongWritable, Text, DoubleGramKey, IntWritable> {
public void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException {
String[] line = val.toString().split("\t");
String[] words = line[0].split(" ");
String decade_str = line[1].substring(0, line[1].length() - 1);
decade_str = new StringBuilder().append(decade_str).append("0").toString(); // the decade. for example 1990
IntWritable decade = new IntWritable(Integer.parseInt(decade_str));
IntWritable occurrences = new IntWritable(Integer.parseInt(line[2]));
if (words.length == 1) {
context.write(new DoubleGramKey(new Text(words[0]), new Text("*"), decade), occurrences);
context.write(new DoubleGramKey(decade), occurrences); //<* * decade, occurs>
} else if (words.length == 2) {
context.write(new DoubleGramKey(new Text(words[0]), new Text(words[1]), decade), occurrences);
}
// this is how we will cound number of words per decade
// example *^&decade_couneter 1990 , 20
}
}
public class MyCombiner extends Reducer<DoubleGramKey, IntWritable, DoubleGramKey, IntWritable> {
@Override
public void reduce(DoubleGramKey key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
System.out.println("starting EMR");
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(StepOne.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(DoubleGramKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyCombiner.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(StepOne.MyPartitioner.class);
job.setInputFormatClass(TextInputFormat.class);
SequenceFileInputFormat.addInputPath(job, new Path("/home/maor/Desktop/DSP202/ass2_202/googlebooks-eng-all-1gram-20120701-z"));
SequenceFileInputFormat.addInputPath(job, new Path("/home/maor/Desktop/DSP202/ass2_202/googlebooks-eng-all-2gram-20120701-zy"));
String output = "output";
//TODO change the output for args[2] in aws run
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
}
1条答案
按热度按时间aiazj4mn1#
嗯,溢出的问题是由命名方法异常引起的,因此,我发现我的组合器类缺少“static”这个词,这就解决了这个问题。