wordcount mapreduce产生了意外的结果

ltqd579y  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(407)

我正在mapreduce中尝试这个wordcount的java代码,在reduce方法完成之后,我想显示出现次数最多的单词。
为此,我创建了一些类级变量myoutput、mykey和completesum。
我正在用close方法写这个数据,但最后得到了意想不到的结果。

public class WordCount {

public static class Map extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);

        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }

    }
}

static int completeSum = -1;
static OutputCollector<Text, IntWritable> myoutput;
static Text mykey = new Text();

public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }

        if (completeSum < sum) {
            completeSum = sum;
            myoutput = output;
            mykey = key;
        }

    }

    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
        super.close();
        myoutput.collect(mykey, new IntWritable(completeSum));
    }
}

public static void main(String[] args) throws Exception {

    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(Map.class);
    // conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

}
}

输入文件数据

one 
three three three
four four four four 
 six six six six six six six six six six six six six six six six six six 
five five five five five 
seven seven seven seven seven seven seven seven seven seven seven seven seven

结果应该是

six 18

但是我得到了这个结果

three 18

从结果我可以看出,和是正确的,但关键是不正确的。

如果有人能给这些Map和减少方法很好的参考,那将是非常有帮助的。

vhmi4jdf

vhmi4jdf1#

您观察到的问题是由于引用别名造成的。对象引用的对象 key 与新内容一起重用以进行多次调用,因此 mykey 引用同一对象的。它以最后一个减少的键结束。这可以通过复制对象来避免,如:

mykey = new Text(key);

但是,您应该只从输出文件中获取结果,如下所示 static 分布式集群中的不同节点不能共享变量。它只在独立模式下工作,不符合map reduce的目的。
最后,使用全局变量,即使是在独立模式下,如果使用并行本地任务(参见mapreduce-1367和mapreduce-434),最有可能导致竞争。

相关问题