mapreduce-hadoop-multiple-data-on-input

gv8xihay  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(383)

我正在虚拟机上使用Ubuntu20.10和hadoop版本3.2.1(如果你需要更多的信息,请给我留言)。
此时此刻我的输出给了我:

Aaron Wells Peirsol ,M,17,United States,Swimming,2000 Summer,0,1,0
Aaron Wells Peirsol ,M,21,United States,Swimming,2004 Summer,1,0,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,0,1,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,1,0,0

对于以上的输出,我希望能够将他的所有奖牌相加
(绳子末端的三个数字代表金、银、铜
运动员多年来在奥运会上获得的奖牌)。
该项目没有说明年龄(17,21,25,25)
或者当它发生的时候(2000200420082008年夏天),但是我必须加上奖牌
为了能够按获得最多金牌的参赛者等进行分类。
有什么想法吗?如果您需要,我可以为您提供我的代码,但我需要另一个mapreduce,我想它将使用我在上面导入的给定输入,并为我们提供如下内容:

Aaron Wells Peirsol,M,25,United States,Swimming,2008 Summer,2,2,0

如果我们有办法从reduce输出中删除“\t”,那也会非常有益!
谢谢大家抽出时间,杰夫托尼科洛斯·尼古拉斯。

trnvg8h3

trnvg8h31#

虽然一开始看起来有点棘手,但这是wordcount示例的另一个例子,这次只需要组合键和值,以便将数据从Map器添加到表单中的reducer中 key-value 对。
对于Map器,我们需要从输入文件的每一行提取所有信息,并将列中的数据分为两个“类别”:
每个运动员在比赛中的主要信息总是相同的 key 行与行之间发生变化并需要编辑的统计信息
对于每个运动员的行号,我们都知道永远不变的栏目是运动员的姓名、性别、国家和运动。所有这些都将被视为 key 通过使用 , 字符作为每种数据类型之间的分隔符。其余的列数据将放在 key-value 但我们也需要在它们上面使用分隔符,以便首先区分每个年龄段和奥运会年份的奖牌计数。我们将使用:
这个 @ 字符作为年龄和年份之间的分隔符,
这个 # 字符作为奖牌计数器之间的分隔符,
以及 _ 字符作为这两者之间的分隔符
Reduce 我们所要做的就是计算奖牌总数,找出每个运动员的最新年龄和年份。
为了在mapreduce作业的输出的键和值之间没有制表符,我们可以简单地设置 NULL 作为 key-value 使用 , 字符作为分隔符。
此作业的代码如下所示:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class Medals 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
     */
    public static class Map extends Mapper<Object, Text, Text, Text> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String record = value.toString();
            String[] columns = record.split(",");

            // extract athlete's main info
            String name = columns[0];
            String sex = columns[1];
            String country = columns[3];
            String sport = columns[4];

            // extract athlete's stat info
            String age = columns[2];
            String year = columns[5]; 
            String gold = columns[6];
            String silver = columns[7];
            String bronze = columns[8];

            // set the main info as key and the stat info as value
            context.write(new Text(name + "," + sex + "," + country + "," + sport), new Text(age + "@" + year + "_" +  gold + "#" + silver + "#" + bronze));
        }
    }

    /* input:  <(name,sex,country,sport), (age@year_gold#silver#bronze)>
     * output: <(NULL, (name,sex,age,country,sport,year,golds,silvers,bronzes)>
     */
    public static class Reduce extends Reducer<Text, Text, NullWritable, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            // extract athlete's main info
            String[] athlete_info = key.toString().split(",");
            String name = athlete_info[0];
            String sex = athlete_info[1];
            String country = athlete_info[2];
            String sport = athlete_info[3];

            int latest_age = 0;
            String latest_games = "";

            int gold_cnt = 0;
            int silver_cnt = 0;
            int bronze_cnt = 0;

            // for a single athlete, compute their stats...
            for(Text value : values)
            {
                String[] split_value = value.toString().split("_");
                String[] age_and_year = split_value[0].split("@");
                String[] medals = split_value[1].split("#");

                // find the last age and games the athlete has stats in the input file
                if(Integer.parseInt(age_and_year[0]) > latest_age)
                {
                    latest_age = Integer.parseInt(age_and_year[0]);
                    latest_games = age_and_year[1];
                }

                if(Integer.parseInt(medals[0]) == 1)
                    gold_cnt++;

                if(Integer.parseInt(medals[1]) == 1)
                    silver_cnt++;

                if(Integer.parseInt(medals[2]) == 1)
                    bronze_cnt++;
            }

            context.write(NullWritable.get(), new Text(name + "," + sex + "," + String.valueOf(latest_age) + "," + country + "," + sport + "," + latest_games + "," + String.valueOf(gold_cnt) + "," + String.valueOf(silver_cnt) + "," + String.valueOf(bronze_cnt)));
        }
    }

    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("olympic_stats");
        Path output_dir = new Path("medals");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job medals_job = Job.getInstance(conf, "Medals Counter");
        medals_job.setJarByClass(Medals.class);
        medals_job.setMapperClass(Map.class);
        medals_job.setReducerClass(Reduce.class);    
        medals_job.setMapOutputKeyClass(Text.class);
        medals_job.setMapOutputValueClass(Text.class);
        medals_job.setOutputKeyClass(NullWritable.class);
        medals_job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(medals_job, input_dir);
        FileOutputFormat.setOutputPath(medals_job, output_dir);
        medals_job.waitForCompletion(true);
    }
}

当然,结果就是你想要的结果,如下所示:

相关问题