我正在按词频对wordcount示例进行排序。我找了一些帖子,发现我不能在mapreduce中按值排序。所以我决定分别做两份Map缩小工作。因此,第一个是原始字数,第二个读取第一个mapreduce的输出,并按频率对单词进行排序。
第二个mapreduce使用的输入文件如下(这是第一个mapreduce的输出)
1个苹果
2个球
1卡通
4天
这是第二个mapreduce代码的代码。我制作了一个我认为不必要的pair类,并将其用作键。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.*;
//Sorting by valuel
public class Sorting
{
//Using custom writable
public static class Pair implements WritableComparable<Pair>
{
private Text t;
private IntWritable i;
public void set(Text t, IntWritable i)
{
this.t = t;
this.i = i;
}
public Text getFirst() { return t; }
public IntWritable getSecond() { return i; }
public Pair()
{
set(new Text(), new IntWritable());
}
public Pair(Text t, IntWritable i)
{
set(t, i);
}
public int compareTo(Pair p)
{
int cmp = t.compareTo(p.t);
if(cmp != 0)
{
return cmp;
}
return i.compareTo(p.i);
}
public void write(DataOutput out) throws IOException
{
t.write(out);
i.write(out);
}
public void readFields(DataInput in) throws IOException
{
t.readFields(in);
i.readFields(in);
}
}
//public class RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext contxt)
public static class SortingMapper extends Mapper<IntWritable, Text, Pair, NullWritable>
{
String[] output1 = null;
//private Text word = new Text();
@Override
public void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException
{
output1 = value.toString().split(" ");
Text word = new Text(output1[0]);
IntWritable freq = new IntWritable(Integer.parseInt(output1[1]));
context.write(new Pair(word, freq), NullWritable.get());
}
//.write() is the method inherited from interface org.apache.hadoop.mapreduce.TaskInputOutputContext
}
public static class FirstPartitioner extends Partitioner<Pair, NullWritable>
{
@Override
public int getPartition(Pair p, NullWritable n, int numPartitions)
{
System.out.println("Partitioner");
String word = p.getFirst().toString();
char first = word.charAt(0);
char middle = 'n';
if(middle < first)
{
return 0;
}
else
return 1 % numPartitions; //why does % need???
}
}
public static class KeyComparator extends WritableComparator
{
protected KeyComparator()
{
super(Pair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
System.out.println("keyComparator");
Pair v1 = (Pair) w1;
Pair v2 = (Pair) w2;
/*
* since we already count word in the first MR we only need to sort the list by frequency
* so no need to compare Text again
int cmp = Pair.compare(v1.getFirst(), v2.getFirst());
if(cmp != 0) { return cmp; }
*/
return -1 * v1.compareTo(v2);
//possible error: it compares Text first and then compare IntWritable
}
}
public static class GroupComparator extends WritableComparator
{
protected GroupComparator()
{
super(Pair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
System.out.println("group Comparator");
Pair v1 = (Pair) w1;
Pair v2 = (Pair) w2;
return v1.getFirst().compareTo(v2.getFirst());
//this compareTo is under binarycomparable
}
}
public static class SortingReducer extends Reducer<Pair, NullWritable, Pair, NullWritable>
{
@Override
public void reduce(Pair p, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
{
System.out.println("sortingReducer");
context.write(p, NullWritable.get());
}
}
public static void main(String[] args) throws Exception
{
Configuration conf2 = new Configuration();
//String[] otherArgs2 = new GenericOptionsParser(conf1, args).getRemainingArgs();
ControlledJob cJob2 = new ControlledJob(conf2);
conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
cJob2.setJobName("Sorting");
Job job2 = cJob2.getJob();
job2.setJarByClass(Sorting.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setMapperClass(SortingMapper.class);
job2.setPartitionerClass(FirstPartitioner.class);
job2.setSortComparatorClass(KeyComparator.class);
job2.setGroupingComparatorClass(GroupComparator.class);
job2.setReducerClass(SortingReducer.class);
job2.setOutputKeyClass(Pair.class);
job2.setOutputValueClass(NullWritable.class);
job2.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job2, new Path("hdfs:///tmp/inter/part-r-00000"));
FileOutputFormat.setOutputPath(job2, new Path(args[0]));
job2.waitForCompletion(true);
}
}
当我执行上面的代码时,我得到了一个铸造错误。
FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.IntWritable
at Sorting$SortingMapper.map(Sorting.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
我刚开始学习hadoop,这是我自己做的第一个。请让我知道如何解决这个错误,我将不胜感激,如果你给我一些关于整体代码的意见。
2条答案
按热度按时间igsr9ssn1#
将mapper定义更改为following会让您运行,因为您无论如何都没有使用mapper的键,所以其余的逻辑保持不变
kpbpu0082#
你的
SortingMapper.map()
方法需要IntWrtiable
作为关键,但是KeyValueTextInputFormat
生产Text
物体。一Text
对象不能强制转换为IntWritable
,这就是为什么你会得到: