mapper类和reducer类

wgx48brx  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(503)

我是mapreduce的新手,我对mapper类和reducer类设计的这段代码有些怀疑
我熟悉map side加入mapreduce的过程,我学到了以下几点:

public static class CustsMapper extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

在这里,在上面的代码片段中,我了解到我们将类扩展到 Mapper 类和作为 Object 是一把钥匙 Text 是一个值,所以map方法使用 context 对象在这里作为一个输出 context.write(new Text(), new Text()) 根据代码的逻辑体设计。
我的两个问题是:
为什么我们要把班级扩大到 MapReduceBase (它做什么?)以及为什么我们要实现我们的类 Mapper (我知道它是一个类,但在web上的某个地方它显示为一个接口,所以如果我将它扩展到 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 班上有什么问题?
map 功能什么 OutputCollector<Text, IntWritable> output, Reporter reporter ?? 我不知道?我就知道 Context context 应该在这里,但什么是 OutputCollector 以及 Reporter 在这里?
我在做下面的程序:
输入:

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

代码:

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>       
   {       
      //Map function 
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 

         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 

         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 

   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable> 
   {     
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 

            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 

         } 
   }  

   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(ProcessUnits.class); 

      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 

      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

      JobClient.runJob(conf); 
   } 
}

输出:

1981    34 
1984    40 
1985    45
33qvvth1

33qvvth11#

为什么我们将类扩展到mapreducebase(它做什么?),为什么我们将类实现为mapper
因为这是Hadoop2.x出现之前用MapredAPI编写的旧代码。
我知道上下文应该在这里,但是outputcollector和reporter在这里是什么
它是上下文对象的早期版本。
hadoop:outputcollector在mapreduce期间是如何工作的?
outputcollector如何工作?

相关问题