我正在尝试学习mapreduce,而且对它很陌生。我研究了combiner通过在数据节点级别减少Map器输出来提供的优化。
现在,Map器输出key/val和组合器输入key/value需要相同是可以理解的。但我无法理解合并器输出键/值和Map器输出键/值需要相同的事实。
如果我想找到表name,price中数据的平均值,那么我可能会选择以下选项:
Mapper<LongWritable, Text, Text, IntWritable>
Combiner<Text, IntWritable, Text, FloatWritable>
Reducer<Text, IntWritable, Text, FloatWritable>
这样做,我得到的错误,当我在网上阅读时,我发现Map器和组合器的输出需要是相同的,但找不到一个原因。
以下是我的样本数据:
Schema - cid,cname,email,date,pid,pname,price
101,jai,j@j.com,1-aug-2016,1,iphone,65000
101,jai,j@j.com,1-aug-2016,2,ipad,35000
101,jai,j@j.com,1-aug-2016,3,Samsung S5,34000
下面是我的代码:
import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
public class q1 {
//cid,cname,email,date,pid,pname,price
public static class avg_mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] line = value.toString().split(",");
Text cname = new Text(line[1]);
IntWritable price = new IntWritable(Integer.parseInt(line[6]));
context.write(cname, price);
}
}
public static class avg_reducer extends Reducer<Text, IntWritable, Text, FloatWritable>{
public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException{
int sum = 0;
int count=0;
for (IntWritable val : value){
count+=1;
sum+=val.get();
}
Float avg = (float)sum/count;
context.write(key,new FloatWritable(avg));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "Average");
job.setJarByClass(q1.class);
job.setMapperClass(avg_mapper.class);
job.setReducerClass(avg_reducer.class);
job.setCombinerClass(avg_reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
下面是我得到的错误:
Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.FloatWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1374)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1691)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at q1$avg_reducer.reduce(q1.java:34)
at q1$avg_reducer.reduce(q1.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1712)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1641)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1492)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:729)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
我想找出每个cname的平均价格。感谢您的帮助。
1条答案
按热度按时间3gtaxfhh1#
但我无法理解合并器输出键/值和Map器输出键/值需要相同的事实。
这是非常简单的输入类型的减速器是没有改变,所以在你的情况下,它总是
(Text, IntWritable)
. reducer不关心如何向它提供此输入。它总是希望输入的类型为(Text, IntWritable)
所以Map器和合路器的输出应该是相同的,应该是相同的(Text, IntWritable)
.但是你应该知道的第一件事是你不应该在mapreduce的combiner中放入你的应用程序的一些逻辑。而且hadoop在执行job时可能会多次运行合并器。
那么combiner的目的是什么呢?
combiner的唯一目标是减少从执行mapper任务的机器发送到将运行reducer任务的机器的数据量。如果要编写组合器,应将其设计为在mapreeduce中执行此组合器的次数不会影响应用程序的输出。
现在想想你已经改变了Map的输出类型,这样它就可以运行而不会出错。你的申请还有其他问题吗?当然可以。
假设您有以下输入:
所以Map输出如下:
现在想象两种不同情况下的减速机输入:
第一个场景合并器根本不执行:
在这种情况下,减速器输出将为:
对Map器输出中的两个第一元素执行第二个场景合并器:
在这种情况下,减速器的输出为:
很明显,应用程序的结果将取决于combiner执行的次数。
一种解决方法是为发送到减速器和组合器的值分配重量,例如,对于减速器输出上方的相同输入,将如下所示:
现在想象一下第一个场景,合并器根本没有执行:
在这种情况下,减速机的输入将是上述Map器的输出,因此减速机的输出将是:
第二种情况是在两个第一元素上执行合并器,因此合并器输出如下:
因此减速机输出为:
这样,无论您的组合器运行多少次,应用程序的输出总是相同的。
实施:
在mapreduce中有许多方法可以实现这个解决方案。您可以定义自己的可写类型来保存weight和average,也可以使用简单文本来保存weight和average,并用破折号(-)分隔它们。为了简单起见,我选了第二个。
以下是Map器实现:
下面是reducer的实现:
请注意,有时当组合器执行不同的时间时,结果之间会有很小的差异(因为浮点舍入问题),但这是可以接受的,不会有很大的差异。