public class sort{ public static class sortMapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//add filter logic here
context.write(new Text(value), new Text(""));
}
}
public static void main(String[] args) throws Exception {
if(args.length != 2)
{
System.out.println("missing agrs: usage <prog> <arg1> <arg2>");
System.exit(1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sort mutilple files");
job.setJarByClass(sort.class);
job.setMapperClass(sortMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
2条答案
按热度按时间gtlvzcf81#
一个目录中有三个文件-在一个作业中由一个Map器排序。运行方式
hadoop jar sort.jar sort file:///path/sortFiles/ sortedFiles
```import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class sort{
public static class sortMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
}
x4shl7ld2#
通过将所有预期结果收集到Map器上的本地/内存数据结构中,可以实现部分(每个Map器)排序。然后对其进行排序,最后运行
collector.write
对于现在排序的集合的所有元素。因此,这里与普通行为的区别在于,在后一种情况下,每个元素只是在遇到它们时发出,从而产生随机/非有序输出。
请注意,结果仍然没有
total ordering
:这将需要减速步骤。