hadoop总订单分割器

o4hqfura  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(362)
import java.io.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.partition.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.util.*;

/**
 * Demonstrates how to use Total Order Partitioner on Word Count.
 */
public class TotalOrderPartitionerExample {
public static class WordCount extends Configured implements Tool {
private final static int REDUCE_TASKS = 8;

    public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new WordCount(), args);
      System.exit(exitCode);
    }

    @Override @SuppressWarnings({ "unchecked", "rawtypes" })
    public int run(String[] args) throws Exception {
      // Check arguments.
      if (args.length != 2) {
        String usage =
          "Usage: " +
          "hadoop jar TotalOrderPartitionerExample$WordCount " +
          "<input dir> <output dir>\n"
        System.out.printf(usage);
        System.exit(-1);
      }

      String jobName = "WordCount";
      String mapJobName = jobName + "-Map";
      String reduceJobName = jobName + "-Reduce";

      // Get user args.
      String inputDir = args[0];
      String outputDir = args[1];

      // Define input path and output path.
      Path mapInputPath = new Path(inputDir);
      Path mapOutputPath = new Path(outputDir + "-inter");
      Path reduceOutputPath = new Path(outputDir);

      // Define partition file path.
      Path partitionPath = new Path(outputDir + "-part.lst");

      // Configure map-only job for sampling.
      Job mapJob = new Job(getConf());
      mapJob.setJobName(mapJobName);
      mapJob.setJarByClass(WordCount.class);
      mapJob.setMapperClass(WordMapper.class);
      mapJob.setNumReduceTasks(0);
      mapJob.setOutputKeyClass(Text.class);
      mapJob.setOutputValueClass(IntWritable.class);
      TextInputFormat.setInputPaths(mapJob, mapInputPath);

      // Set the output format to a sequence file.
      mapJob.setOutputFormatClass(SequenceFileOutputFormat.class);
      SequenceFileOutputFormat.setOutputPath(mapJob, mapOutputPath);

      // Submit the map-only job.
      int exitCode = mapJob.waitForCompletion(true) ? 0 : 1;
      if (exitCode != 0) { return exitCode; }

      // Set up the second job, the reduce-only.
      Job reduceJob = new Job(getConf());
      reduceJob.setJobName(reduceJobName);
      reduceJob.setJarByClass(WordCount.class);

      // Set the input to the previous job's output.
      reduceJob.setInputFormatClass(SequenceFileInputFormat.class);
      SequenceFileInputFormat.setInputPaths(reduceJob, mapOutputPath);

      // Set the output path to the final output path.
      TextOutputFormat.setOutputPath(reduceJob, reduceOutputPath);

      // Use identity mapper for key/value pairs in SequenceFile.
      reduceJob.setReducerClass(IntSumReducer.class);
      reduceJob.setMapOutputKeyClass(Text.class);
      reduceJob.setMapOutputValueClass(IntWritable.class);
      reduceJob.setOutputKeyClass(Text.class);
      reduceJob.setOutputValueClass(IntWritable.class);
      reduceJob.setNumReduceTasks(REDUCE_TASKS);

      // Use Total Order Partitioner.
      reduceJob.setPartitionerClass(TotalOrderPartitioner.class);

      // Generate partition file from map-only job's output.
      TotalOrderPartitioner.setPartitionFile(
          reduceJob.getConfiguration(), partitionPath);
      InputSampler.writePartitionFile(reduceJob, new InputSampler.RandomSampler(
          1, 10000));

      // Submit the reduce job.
      return reduceJob.waitForCompletion(true) ? 0 : 2;
    }
  }

  public static class WordMapper extends
      Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      for (String word : line.split("\\W+")) {
        if (word.length() == 0) { continue; }
        context.write(new Text(word), new IntWritable(1));
      }
    }
  }

}

我从github得到了这个代码。我比较了Map和Map的运行时间。常规wordcount比total order paritioner的性能更好。为什么?是否需要进行任何优化或更改以满足平均性能要求?hashpartitioner性能与totalorderpartitioner性能?

olmpazwi

olmpazwi1#

是的,hashpartitioner将比totalorderpartitioner执行得更好,因为hashpartitioner没有开销,也没有运行inputsampler和写入分区文件等。,
totalorderpartitioner仅在需要全局排序的输出时使用,并且比hashpartitioner慢。

相关问题