mapreduce-hadoop-更改reduce键

e5nszbig  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(315)

我正在网上寻找一个关于如何使用map和reduce的教程,但是几乎所有关于wordcount的代码都很糟糕,并没有真正解释如何使用每个函数。我已经看到了所有的理论,键,Map等,但没有代码,例如做一些不同的比wordcount。
我正在虚拟机上使用Ubuntu20.10和hadoop版本3.2.1(如果你需要更多的信息,请给我留言)。
我的任务是管理一个文件,其中包含奥运会运动员的一些数据。
你会看到它包含各种信息,如姓名、性别、年龄、体重、身高等。
我在这里举个例子(希望你能理解):

ID  Name       Sex  Age Height  Weight  Team    NOC Games   Year          Season  City      
Sport          Event                        Medal
1   A Dijiang  M    24  180     80      China   CHN 1992     Summer 1992  Summer  Barcelona 
Basketball     Basketball Men's Basketball  NA

到目前为止,我必须处理所有记录都相同的数据,比如姓名或身份证,
它们彼此相似。
(想象一下有一个参与者不止一次,这就是我的问题
在不同的时间段,所以不能识别相同的记录)
如果我可以将reduce函数的键/识别改为参与者的名字,那么我应该得到正确的结果。
在这个代码中,我搜索的球员,至少赢得了奖牌。
我的主要任务是:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewWordCount {

        public static void main(String[] args) throws Exception {

            if(args.length != 3) {
                System.err.println("Give the correct arguments.");
                System.exit(3);
            }

            // Job 1.
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "count");
            job.setJarByClass(NewWordCount.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setMapperClass(NewWordMapper.class);
            job.setCombinerClass(NewWordReducer.class);
            job.setReducerClass(NewWordReducer.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            job.waitForCompletion(true);
       }
}

我的Map器是:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NewWordMapper extends Mapper <LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable();
    private Text word = new Text();

    private String name = new String();
    private String sex = new String();
    private String age = new String();
    private String team = new String();
    private String sport = new String();
    private String games = new String();
    private String sum = new String();

    private String gold = "Gold";
    private String silver = "Silver";
    private String bronze = "Bronze";

    public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {

        if(((LongWritable)key).get() == 0) {
            return;
        }

        String line = value.toString();
        String[] arrOfStr = line.split(",");
        int counter = 0;

        for(String a : arrOfStr) {
            if(counter == 14) {             
                // setting the type of medal each player has won.
                word.set(a);

                // checking if the medal is gold.
                if(a.compareTo(gold) == 0 || a.compareTo(silver) == 0 || a.compareTo(bronze) == 0) {
                    String[] goldenStr = line.split(",");
                    name = goldenStr[1];
                    sex = goldenStr[2];
                    age = goldenStr[3];
                    team = goldenStr[6];
                    sport = goldenStr[12];
                    games = goldenStr[8];
                    sum = name + "," + sex + "," + age + "," + team + "," + sport + "," + games;
                    word.set(sum);
                    context.write(word, one);
                }
            }
            counter++;
        }
    }
}

我的减速机是:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NewWordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count = 0;
        for(IntWritable val : values) {

            String line  = val.toString();
            String[] arrOfStr = line.split(",");
            String name = arrOfStr[0];

            count += val.get();
        }
        context.write(key, new IntWritable(count));
    }
}
ie3xauqp

ie3xauqp1#

mapreduce jobs的核心思想是 Map 函数用于从输入中提取有价值的信息并将其“转换”为 key-value 成对的,基于它们的 Reduce 函数将分别为每个键执行。您的代码似乎对后者的执行方式存在误解,但这没什么大不了的,因为这是wordcount示例的一个恰当示例。
假设我们有一个档案,里面有奥运运动员的数据和他们的奖牌表现,就像你在一个名为 /olympic_stats 在如下所示的hdfs中(您可以看到我包含了与此示例需要处理的运动员相同的记录):

1,A Dijiang,M,24,180,80,China,CHN,1992,Summer 1992,Summer,Barcelona,Basketball,Men's Basketball,NA
2,T Kekempourdas,M,33,189,85,Greece,GRE,2004,Summer 2004,Summer,Athens,Judo,Men's Judo,Gold
3,T Kekempourdas,M,33,189,85,Greece,GRE,2000,Summer 2000,Summer,Sydney,Judo,Men's Judo,Bronze
4,K Stefanidi,F,29,183,76,Greece,GRE,2016,Summer 2016,Summer,Rio,Pole Vault, Women's Pole Vault,Silver
5,A Jones,F,26,160,56,Canada,CAN,2012,Summer 2012,Summer,London,Acrobatics,Women's Acrobatics,Gold
5,A Jones,F,26,160,56,Canada,CAN,2016,Summer 2012,Summer,Rio,Acrobatics,Women's Acrobatics,Gold
6,C Glover,M,33,175,80,USA,USA,2008,Summer 2008,Summer,Beijing,Archery,Men's Archery,Gold
7,C Glover,M,33,175,80,USA,USA,2012,Summer 2012,Summer,London,Archery,Men's Archery,Gold
8,C Glover,M,33,175,80,USA,USA,2016,Summer 2016,Summer,Rio,Archery,Men's Archery,Gold

对于 Map 在函数中,我们需要找到一列数据,作为计算每个运动员有多少枚金牌的关键。从上面我们可以很容易地看到,每个运动员都有一个或多个记录,他们的名字都会出现在第二栏,所以我们确信我们会用他们的名字作为关键 key-value 对。至于价值,我们确实想计算一个运动员有多少枚金牌,所以我们必须检查第14栏,指出这个运动员是否获得了金牌以及获得了什么金牌。如果此记录的列等于 String 到目前为止,我们可以肯定这位运动员在他的职业生涯中至少获得了一枚金牌。所以这里,作为值,我们可以把1。
现在是 Reduce 函数,因为它是针对每个不同的键分别执行的,所以我们可以理解,它从Map器获得的输入值将是针对同一个特定的值。自从 key-value 由Map绘制者生成的一对,对于给定运动员的每枚金牌,其值只有1,我们可以将所有这些1相加,得到每枚金牌的总数。
因此,这方面的代码如下所示(为了简单起见,我将mapper、reducer和driver放在同一个文件中):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class GoldMedals 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <Athlete's Name, 1>
     */
    public static class Map extends Mapper<Object, Text, Text, IntWritable> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String record = value.toString();
            String[] columns = record.split(",");

            // extract the athlete's name and his/hers medal indication
            String athlete_name = columns[1];
            String medal = columns[14];

            // only hold the gold medal athletes, with their name as the key
            // and 1 as the least number of gold medals they have so far
            if(medal.equals("Gold")) 
                context.write(new Text(athlete_name), new IntWritable(1));
        }
    }

    /* input:  <Athlete's Name, 1>
     * output: <Athlete's Name, Athlete's Total Gold Medals>
     */
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
        {
            int sum = 0;

            // for a single athlete, add all of the gold medals they had so far...
            for(IntWritable value : values)
                    sum += value.get();

            // and write the result as the value on the output key-value pairs
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("olympic_stats");
        Path output_dir = new Path("gold_medals");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job goldmedals_job = Job.getInstance(conf, "Gold Medals Counter");
        goldmedals_job.setJarByClass(GoldMedals.class);
        goldmedals_job.setMapperClass(Map.class);
        goldmedals_job.setCombinerClass(Reduce.class);
        goldmedals_job.setReducerClass(Reduce.class);    
        goldmedals_job.setMapOutputKeyClass(Text.class);
        goldmedals_job.setMapOutputValueClass(IntWritable.class);
        goldmedals_job.setOutputKeyClass(Text.class);
        goldmedals_job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(goldmedals_job, input_dir);
        FileOutputFormat.setOutputPath(goldmedals_job, output_dir);
        goldmedals_job.waitForCompletion(true);
    }
}

上述程序的输出存储在 /olympic_stats_out 目录,该目录具有以下输出并确认 MapReduce 作业设计正确:

相关问题