java—为什么Map器的输出键/值需要和组合器的输出键/值相同

idfiyjo8  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(481)

我正在尝试学习mapreduce,而且对它很陌生。我研究了combiner通过在数据节点级别减少Map器输出来提供的优化。
现在,Map器输出key/val和组合器输入key/value需要相同是可以理解的。但我无法理解合并器输出键/值和Map器输出键/值需要相同的事实。
如果我想找到表name,price中数据的平均值,那么我可能会选择以下选项:

Mapper<LongWritable, Text, Text, IntWritable>
Combiner<Text, IntWritable, Text, FloatWritable>
Reducer<Text, IntWritable, Text, FloatWritable>

这样做,我得到的错误,当我在网上阅读时,我发现Map器和组合器的输出需要是相同的,但找不到一个原因。
以下是我的样本数据:

Schema - cid,cname,email,date,pid,pname,price
101,jai,j@j.com,1-aug-2016,1,iphone,65000
101,jai,j@j.com,1-aug-2016,2,ipad,35000
101,jai,j@j.com,1-aug-2016,3,Samsung S5,34000

下面是我的代码:

import java.io.IOException;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

public class q1 {
    //cid,cname,email,date,pid,pname,price

    public static class avg_mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String[] line = value.toString().split(",");
            Text cname = new Text(line[1]);
            IntWritable price = new IntWritable(Integer.parseInt(line[6]));
            context.write(cname, price);
        }
    }
    public static class avg_reducer extends Reducer<Text, IntWritable, Text, FloatWritable>{
        public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException{
            int sum = 0;
            int count=0;
            for (IntWritable val : value){
                count+=1;
                sum+=val.get();
            }
            Float avg = (float)sum/count;
            context.write(key,new FloatWritable(avg));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Average");
        job.setJarByClass(q1.class);
        job.setMapperClass(avg_mapper.class);
        job.setReducerClass(avg_reducer.class);
        job.setCombinerClass(avg_reducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0:1);

    }

}

下面是我得到的错误:

Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.FloatWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1374)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1691)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at q1$avg_reducer.reduce(q1.java:34)
at q1$avg_reducer.reduce(q1.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1712)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1641)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1492)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:729)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

我想找出每个cname的平均价格。感谢您的帮助。

3gtaxfhh

3gtaxfhh1#

但我无法理解合并器输出键/值和Map器输出键/值需要相同的事实。
这是非常简单的输入类型的减速器是没有改变,所以在你的情况下,它总是 (Text, IntWritable) . reducer不关心如何向它提供此输入。它总是希望输入的类型为 (Text, IntWritable) 所以Map器和合路器的输出应该是相同的,应该是相同的 (Text, IntWritable) .
但是你应该知道的第一件事是你不应该在mapreduce的combiner中放入你的应用程序的一些逻辑。而且hadoop在执行job时可能会多次运行合并器。
那么combiner的目的是什么呢?
combiner的唯一目标是减少从执行mapper任务的机器发送到将运行reducer任务的机器的数据量。如果要编写组合器,应将其设计为在mapreeduce中执行此组合器的次数不会影响应用程序的输出。
现在想想你已经改变了Map的输出类型,这样它就可以运行而不会出错。你的申请还有其他问题吗?当然可以。
假设您有以下输入:

101,jai,j@j.com,1-aug-2016,1,iphone,65000
101,jai,j@j.com,1-aug-2016,2,ipad,35000
101,jai,j@j.com,1-aug-2016,3,Samsung S5,34000

所以Map输出如下:

jai -> 65000
jai -> 35000
jai -> 34000

现在想象两种不同情况下的减速机输入:
第一个场景合并器根本不执行:

jai -> 65000
jai -> 35000
jai -> 34000

在这种情况下,减速器输出将为:

jai -> 44666.666666666664

对Map器输出中的两个第一元素执行第二个场景合并器:

jai -> 50000 // combiner executed on the first two item above and produce jai -> (65000 + 35000) / 2

jai -> 34000 // the third is sent to the reducer without combiner executed on it

在这种情况下,减速器的输出为:

jai -> 67000 // (50000 + 34000) / 2

很明显,应用程序的结果将取决于combiner执行的次数。
一种解决方法是为发送到减速器和组合器的值分配重量,例如,对于减速器输出上方的相同输入,将如下所示:

jai -> 1-65000 // this shows both weigh and value separated by dash(-)
jai -> 1-35000
jai -> 1-34000

现在想象一下第一个场景,合并器根本没有执行:
在这种情况下,减速机的输入将是上述Map器的输出,因此减速机的输出将是:

jai -> 3-44666.666666666664

第二种情况是在两个第一元素上执行合并器,因此合并器输出如下:

jai -> 2-50000 // this is jai -> 2 - (65000 + 35000) / 2
jai -> 1-34000

因此减速机输出为:

jai -> 3-44666.666666666664 //   3 - (2 * 50000) + (1 * 34000) / 3

这样,无论您的组合器运行多少次,应用程序的输出总是相同的。
实施:
在mapreduce中有许多方法可以实现这个解决方案。您可以定义自己的可写类型来保存weight和average,也可以使用简单文本来保存weight和average,并用破折号(-)分隔它们。为了简单起见,我选了第二个。
以下是Map器实现:

public class AverageMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] line = value.toString().split(",");
        Text cname = new Text(line[1]);
        context.write(cname, new Text(1 + "-" + String.valueOf(line[6])));
    }
}

下面是reducer的实现:

public class AverageReducer extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        double sum = 0D;
        long count = 0L;
        long elementCount;
        for(Text value : values) {
            String str = new String(value.copyBytes());
            String[] result = str.split("-");
            elementCount = Long.valueOf(result[0]);
            count += elementCount;
            sum += elementCount * Double.valueOf(result[1]);
        }
        context.write(key, new Text(String.valueOf(count + "-" + (sum / count))));
    } 
}

请注意,有时当组合器执行不同的时间时,结果之间会有很小的差异(因为浮点舍入问题),但这是可以接受的,不会有很大的差异。

相关问题