前n项的map reduce

dfty9e19  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(305)

我正在用java开发hadoop项目,遇到了一些困难。我理解我应该做的事情的目标,但是我真的不知道如何实现它。我试图从map reduce作业中提取前n个结果,例如前5个最高频率值。
我知道这通常需要两个map reduce,一个用于reduce,另一个用于对值进行排序。然而,正如我所说,我对如何真正实现这一点相当迷茫。
我使用的代码是一个相当标准的map reduce代码,带有一些针对特殊值的过滤。

public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
      private Text wordToken = new Text();
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
      {
          StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
        while (tokens.hasMoreTokens())
        {
          wordToken.set(tokens.nextToken());
          context.write(wordToken, new IntWritable(1));
        }
      }
    }

减速机

public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
      private IntWritable count = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
        int valueSum = 0;
        for (IntWritable val : values)
        {
          valueSum += val.get();
        }
        count.set(valueSum);
        context.write(key, count);
      }
    }

司机

public class WordCount {
      public static void main(String[] args) throws Exception
      {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
          System.exit(2);
        }
        Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
        wcJob.setJarByClass(WordCount.class);
        wcJob.setMapperClass(MapWordCount.class);
        wcJob.setCombinerClass(ReduceWordCount.class);
        wcJob.setReducerClass(ReduceWordCount.class);
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
        System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
      }
    }

如果有人能帮我这个忙,我将不胜感激。正如我所说,我知道我需要两个Map缩小,但不太确定如何从这个开始。我尝试了在stackoverflow上找到的几个其他解决方案,但对我的案例来说运气不太好。非常感谢!

shyt4zoc

shyt4zoc1#

您的确是对的,您确实需要将两个mapreduce作业链接在一起。更具体地说,您需要:
一个任务是计算输入文档中存储的每个单词的字数,
一个任务是能够“排序”所有这些单词和字数,以便选择和输出顶部 N 他们中的一个。
第一项工作与您已经提出的工作非常相似,因此我将重点介绍第二项工作,以便更清楚地了解topn在mapreduce范式中的工作方式。
把topn mr作业看作一个独立的东西,我们知道这个特定的作业将接收一堆键值对,其中最后一步的每个单词都将是键,它的wordcount将是值。因为Map器和还原器是 map 以及 reduce 函数并行运行时,我们需要找到一种方法,首先在本地找到topn字(即对于每个Map器),然后对所有这些本地topn结果进行分组,以找到输入给应用程序的所有数据的“全局”topn字。
所以 TopNMapper 首先要创建一个 TreeMap (一种java键值数据结构,在内部按键对元素进行排序) setup 函数(所以在创建Map器示例之前),每个Map器将初始化它的一个对象,并将每个单词及其wordcount作为元素。对于这种类型的计算(topn),我们将wordcount作为关键字,word作为值,以获得单词的升序排序列表。因为我们只需要找出最上面的 N 在这里的话,可以肯定地说,我们只想要顶部 N 每个Map器的单词,所以我们可以删除下面的所有其他元素,并有一个 TreeMapN 元素,在Map程序执行结束时(即通过 cleanup 函数)。Map器将编写键值对,其中单词将成为键,而单词计数将成为值,如下所示: <word, wordcount> 现在是 TopNReducer ,我们需要使用 TreeMap 要用所有本地topn元素填充它,请删除不是top元素的元素 N 然后写下这些单词,它们的单词作为输出。为了更“干净”,我们可以“反转”关键字-值对结构中的单词和单词计数,这样我们就可以将单词计数作为关键字,将单词作为值。这将导致按(升序)排序的键值对数量,这些键值对将在完成此作业后存储在磁盘中,如下所示: wordcount, word> 在2个mr jobs中可以做这样一件事的程序如下(我们在这里设置 N 作为一个全球性的 Configuration 内部价值 main 函数与 conf.set("N", "10"); 命令,并在 setup 委员会的职能 TopNMapper 以及 TopNReducer 类),所有类都放在一个类中 TopNWordCount 为简单起见:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

public class TopNWordCount
{
    /* input:  <document, contents>
     * output: <word, 1>
     */
    public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            // clean up the document text and split the words into an array
            String[] words = value.toString()
                                .replaceAll("\\d+", "")           // get rid of numbers...
                                .replaceAll("[^a-zA-Z ]", " ")    // get rid of punctuation...
                                .toLowerCase()                                      // turn every letter to lowercase...
                                .trim()                                             // trim the spaces
                                .replaceAll("\\s+", " ")
                                .split(" ");

            // write every word as key with `1` as value that indicates that the word is
            // found at least 1 time inside the input text
            for(String word : words)
                context.write(new Text(word), one);
        }
    }

    /* input: <word, 1>
     * output: <word, wordcount>
     */
    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        private IntWritable wordcount = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int word_cnt = 0;

            for(IntWritable value : values)
                word_cnt += value.get();

            wordcount.set(word_cnt);

            context.write(key, wordcount);
        }
    }

    /* input:  <word, wordcount>
     * output: <NULL, (word, wordcount)> (with the local topN words)
     */
    public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
    {
        private int n;  // the N of TopN
        private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency

        public void setup(Context context)
        {
            n = Integer.parseInt(context.getConfiguration().get("N"));  // get N
            word_list = new TreeMap<Integer, String>();
        }

        public void map(Object key, Text value, Context context)
        {
            String[] line = value.toString().split("\t");   // split the word and the wordcount

            // put the wordcount as key and the word as value in the word list
            // so the words can be sorted by their wordcounts
            word_list.put(Integer.valueOf(line[1]), line[0]);

            // if the local word list is populated with more than N elements
            // remove the first (aka remove the word with the smallest wordcount)
            if (word_list.size() > n)
                word_list.remove(word_list.firstKey());
        }

        public void cleanup(Context context) throws IOException, InterruptedException
        {
            // write the topN local words before continuing to TopNReducer
            // with each word as key and its wordcount as value
            for (Map.Entry<Integer, String> entry : word_list.entrySet())
            {
                context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
            }
        }
    }

    /* input:  <word, wordcount> (with the local topN words)
     * output: <wordcount, word> (with the global topN words)
     */
    public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
    {
        private int n;  // the N of TopN
        private TreeMap<Integer, String> word_list; //  list with words globally sorted by their frequency

        public void setup(Context context)
        {
            n = Integer.parseInt(context.getConfiguration().get("N"));  // get N
            word_list = new TreeMap<Integer, String>();
        }

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
        {
            int wordcount = 0;

            // get the one and only value (aka the wordcount) for each word
            for(IntWritable value : values)
                wordcount = value.get();

            // put the wordcount as key and the word as value in the word list
            // so the words can be sorted by their wordcounts
            word_list.put(wordcount, key.toString());

            // if the global word list is populated with more than N elements
            // remove the first (aka remove the word with the smallest wordcount)
            if (word_list.size() > n)
                word_list.remove(word_list.firstKey());
        }

        public void cleanup(Context context) throws IOException, InterruptedException
        {
            // write the topN global words with each word as key and its wordcount as value
            // so the output will be sorted by the wordcount
            for (Map.Entry<Integer, String> entry : word_list.entrySet())
            {
                context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
            }
        }
    }

    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        conf.set("N", "10"); // set the N as a "public" value in the current Configuration

        if (pathArgs.length < 2)
        {
          System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
          System.exit(2);
        }

        Path wordcount_dir = new Path("wordcount");
        Path output_dir = new Path(pathArgs[pathArgs.length - 1]);

        // if the in-between and output directories exists, delete them
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(wordcount_dir))
            fs.delete(wordcount_dir, true);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        Job wc_job = Job.getInstance(conf, "WordCount");
        wc_job.setJarByClass(TopNWordCount.class);
        wc_job.setMapperClass(WordCountMapper.class);
        wc_job.setReducerClass(WordCountReducer.class);
        wc_job.setMapOutputKeyClass(Text.class);
        wc_job.setMapOutputValueClass(IntWritable.class);
        wc_job.setOutputKeyClass(Text.class);
        wc_job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < pathArgs.length - 1; ++i)
        {
          FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
        }
        FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
        wc_job.waitForCompletion(true);

        Job topn_job = Job.getInstance(conf, "TopN");
        topn_job.setJarByClass(TopNWordCount.class);
        topn_job.setMapperClass(TopNMapper.class);
        topn_job.setReducerClass(TopNReducer.class);
        topn_job.setMapOutputKeyClass(Text.class);
        topn_job.setMapOutputValueClass(IntWritable.class);
        topn_job.setOutputKeyClass(IntWritable.class);
        topn_job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(topn_job, wordcount_dir);
        FileOutputFormat.setOutputPath(topn_job, output_dir);
        topn_job.waitForCompletion(true);
    }
}

此程序的输出(使用此目录和文本文件作为输入)如下所示:

注意,这里的前10个单词是stopwords(比如 the , to 等等),正如我们所料。如果你想过滤掉那些停止词,你当然可以使用tf-idf并用hadoop实现它,比如下面的方法。

相关问题