hadoop:哪个Map器返回了哪个结果?

yruzcnhs  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(265)

我是hadoop新手。我想运行一个mapreduce示例,并使用计算器Map器查看其结果。也就是说,我想知道,每个中间结果是由哪个Map器计算出来的?有可能吗?怎样?
我安装了hadoop2.9.0(多节点集群)。

ej83mcc0

ej83mcc01#

首先我们看一下示例代码(我已经安装了hdp cluster,因此.jar文件的路径可能不同)
作为输入的示例文本文件:
$bin/hadoop dfs-ls/字数/输入/
/wordcount/input/file01
/字数/输入/文件02
$bin/hadoop dfs-cat/wordcount/input/file01
你好世界再见世界
$bin/hadoop dfs-cat/wordcount/input/file02
你好hadoop再见hadoop
运行应用程序:
$bin/hadoop-jar/usr/hdp/2.6x.x/hadoop-mapreduce/hadoo-mapreduce-examples.jar字数/wordcount/input/wordcount/output
注意:您不需要编写字数计算程序,正如我所提到的,它默认位于mapreduce文件夹中。以下代码仅供工作人员参考
输出:
$bin/hadoop dfs-cat/wordcount/output/part-00000个
再见1
再见1
hadoop 2
你好2
世界2
现在,让我们看看Map器和还原器在后端是如何工作的:
wordcount应用程序非常简单。
Map器实现(第14-26行)通过map方法(第18-25行)一次处理一行,由指定的textinputformat(第49行)提供。然后,它通过stringtokenizer将行拆分为由空格分隔的标记,并发出一个<,1>的键值对。
对于给定的示例输入,第一个贴图将发射:
<你好,1>
<世界,1>
<再见,1>
<世界,1>
第二个贴图发射:
<你好,1>
<hadoop,1>
<再见,1>
<hadoop,1>
我们将在本教程稍后的部分详细了解为给定作业生成的Map的数量,以及如何以细粒度的方式控制它们。
wordcount还指定一个组合器(第46行)。因此,在对键进行排序之后,每个map的输出通过本地合并器(根据作业配置,它与reducer相同)进行本地聚合。
第一个Map的输出:
<再见,1>
<你好,1>
<世界,2>
第二个Map的输出:
<再见,1>
<hadoop,2>
<你好,1>
reducer实现(第28-36行)通过reduce方法(第29-35行)对值进行求和,这些值是每个键(即本例中的单词)的出现计数。
因此,作业的输出为:
<再见,1>
<再见,1>
<hadoop,2>
<你好,2>
<世界,2>
run方法在jobconf中指定作业的各个方面,例如输入/输出路径(通过命令行传递)、键/值类型、输入/输出格式等。然后,它调用jobclient.runjob(第55行)提交并监视其进度。
现在,这里提到的字数计算程序是:

1.  package org.myorg;

2.  

3.  import java.io.IOException;

4.  import java.util.*;

5.  

6.  import org.apache.hadoop.fs.Path;

7.  import org.apache.hadoop.conf.*;

8.  import org.apache.hadoop.io.*;

9.  import org.apache.hadoop.mapred.*;

10. import org.apache.hadoop.util.*;

11. 

12. public class WordCount {

13. 

14.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

15.      private final static IntWritable one = new IntWritable(1);

16.      private Text word = new Text();

17. 

18.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

19.        String line = value.toString();

20.        StringTokenizer tokenizer = new StringTokenizer(line);

21.        while (tokenizer.hasMoreTokens()) {

22.          word.set(tokenizer.nextToken());

23.          output.collect(word, one);

24.        }

25.      }

26.    }

27. 

28.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

29.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

30.        int sum = 0;

31.        while (values.hasNext()) {

32.          sum += values.next().get();

33.        }

34.        output.collect(key, new IntWritable(sum));

35.      }

36.    }

37. 

38.    public static void main(String[] args) throws Exception {

39.      JobConf conf = new JobConf(WordCount.class);

40.      conf.setJobName("wordcount");

44. 

45.      conf.setMapperClass(Map.class);

46.      conf.setCombinerClass(Reduce.class);

47.      conf.setReducerClass(Reduce.class);

48. 

49.      conf.setInputFormat(TextInputFormat.class);

50.      conf.setOutputFormat(TextOutputFormat.class);

51. 

52.      FileInputFormat.setInputPaths(conf, new Path(args[0]));

53.      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

54. 

55.      JobClient.runJob(conf);

57.    }

58. }

59.

参考:mapreduce教程

相关问题