按值对mapreduce wordcount输出排序

egdjgwm8  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(523)

我目前正在编写一个hadoop程序,在给定一组tweet数据的情况下,输出前100个tweet最多的hastags。我可以用 WordCount 程序。所以输出如下,忽略引号:

"#USA 2" 

"#Holy 5"

"#SOS 3"

"#Love 66"

然而,当我试图用这里的代码按词频(值)对它们进行排序时,我遇到了麻烦。
我注意到,对于上面链接中提供的程序输入,键是整数而不是字符串。我试着修改代码中的一些参数以适应我的使用,但效果不太好,因为我不太了解它们。请帮帮我!

fbcarpbf

fbcarpbf1#

你需要一点时间 mapReduce 作业,其中输入是第一个作业的输出。
我已经调整了代码,使它按照你的愿望工作。
用于输入


# USA 2

# Holy 5

# SOS 3

# Love 66

输出应该是

66 #Love 

5 #Holy 

3 #SOS 

2 #USA

我假设tab是在hashtag和count之间分隔的。如果是别的东西,请改一下。代码没有测试,请让我知道它是否工作。

package com.my.cert.example;

import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class ValueSortExp {
 public static void main(String[] args) throws Exception {

  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");

  // Path inputPath = new Path(args[0]);
  // Path outputDir = new Path(args[1]);

  // Create configuration
  Configuration conf = new Configuration(true);

  // Create job
  Job job = new Job(conf, "Test HIVE commond");
  job.setJarByClass(ValueSortExp.class);

  // Setup MapReduce
  job.setMapperClass(ValueSortExp.MapTask.class);
  job.setReducerClass(ValueSortExp.ReduceTask.class);
  job.setNumReduceTasks(1);

  // Specify key / value
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(IntWritable.class);
  job.setOutputValueClass(Text.class);
  job.setSortComparatorClass(IntComparator.class);
  // Input
  FileInputFormat.addInputPath(job, inputPath);
  job.setInputFormatClass(TextInputFormat.class);

  // Output
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * // Delete output if exists FileSystem hdfs = FileSystem.get(conf); if
   * (hdfs.exists(outputDir)) hdfs.delete(outputDir, true);
   * 
   * // Execute job int code = job.waitForCompletion(true) ? 0 : 1;
   * System.exit(code);
   */

  // Execute job
  int code = job.waitForCompletion(true) ? 0 : 1;
  System.exit(code);

 }

 public static class IntComparator extends WritableComparator {

     public IntComparator() {
         super(IntWritable.class);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1,
             byte[] b2, int s2, int l2) {

         Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
         Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

         return v1.compareTo(v2) * (-1);
     }
 }

 public static class MapTask extends
   Mapper<LongWritable, Text, IntWritable, IntWritable> {
  public void map(LongWritable key, Text value, Context context)
    throws java.io.IOException, InterruptedException {
   String line = value.toString();
   String[] tokens = line.split("\t"); // This is the delimiter between Key and Value
   int valuePart = Integer.parseInt(tokens[1]);
   context.write(new IntWritable(valuePart), new Text(tokens[0]));
  }
 }

 public static class ReduceTask extends
   Reducer<IntWritable, Text, Text, IntWritable> {
  public void reduce(IntWritable key, Iterable<Text> list, Context context)
    throws java.io.IOException, InterruptedException {

   for (Text value : list) {

    context.write(value,key);

   }

  }
 }

}

相关问题