我正在用java开发hadoop项目,遇到了一些困难。我理解我应该做的事情的目标,但是我真的不知道如何实现它。我试图从map reduce作业中提取前n个结果,例如前5个最高频率值。
我知道这通常需要两个map reduce,一个用于reduce,另一个用于对值进行排序。然而,正如我所说,我对如何真正实现这一点相当迷茫。
我使用的代码是一个相当标准的map reduce代码,带有一些针对特殊值的过滤。
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
减速机
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
司机
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
如果有人能帮我这个忙,我将不胜感激。正如我所说,我知道我需要两个Map缩小,但不太确定如何从这个开始。我尝试了在stackoverflow上找到的几个其他解决方案,但对我的案例来说运气不太好。非常感谢!
1条答案
按热度按时间shyt4zoc1#
您的确是对的,您确实需要将两个mapreduce作业链接在一起。更具体地说,您需要:
一个任务是计算输入文档中存储的每个单词的字数,
一个任务是能够“排序”所有这些单词和字数,以便选择和输出顶部
N
他们中的一个。第一项工作与您已经提出的工作非常相似,因此我将重点介绍第二项工作,以便更清楚地了解topn在mapreduce范式中的工作方式。
把topn mr作业看作一个独立的东西,我们知道这个特定的作业将接收一堆键值对,其中最后一步的每个单词都将是键,它的wordcount将是值。因为Map器和还原器是
map
以及reduce
函数并行运行时,我们需要找到一种方法,首先在本地找到topn字(即对于每个Map器),然后对所有这些本地topn结果进行分组,以找到输入给应用程序的所有数据的“全局”topn字。所以
TopNMapper
首先要创建一个TreeMap
(一种java键值数据结构,在内部按键对元素进行排序)setup
函数(所以在创建Map器示例之前),每个Map器将初始化它的一个对象,并将每个单词及其wordcount作为元素。对于这种类型的计算(topn),我们将wordcount作为关键字,word作为值,以获得单词的升序排序列表。因为我们只需要找出最上面的N
在这里的话,可以肯定地说,我们只想要顶部N
每个Map器的单词,所以我们可以删除下面的所有其他元素,并有一个TreeMap
的N
元素,在Map程序执行结束时(即通过cleanup
函数)。Map器将编写键值对,其中单词将成为键,而单词计数将成为值,如下所示:<word, wordcount>
现在是TopNReducer
,我们需要使用TreeMap
要用所有本地topn元素填充它,请删除不是top元素的元素N
然后写下这些单词,它们的单词作为输出。为了更“干净”,我们可以“反转”关键字-值对结构中的单词和单词计数,这样我们就可以将单词计数作为关键字,将单词作为值。这将导致按(升序)排序的键值对数量,这些键值对将在完成此作业后存储在磁盘中,如下所示:wordcount, word>
在2个mr jobs中可以做这样一件事的程序如下(我们在这里设置N
作为一个全球性的Configuration
内部价值main
函数与conf.set("N", "10");
命令,并在setup
委员会的职能TopNMapper
以及TopNReducer
类),所有类都放在一个类中TopNWordCount
为简单起见:此程序的输出(使用此目录和文本文件作为输入)如下所示:
注意,这里的前10个单词是stopwords(比如
the
,to
等等),正如我们所料。如果你想过滤掉那些停止词,你当然可以使用tf-idf并用hadoop实现它,比如下面的方法。