reducer被跳过,只生成Map器结果

0s7z1bwu  于 2021-05-30  发布在  Hadoop
关注(0)|答案(0)|浏览(218)

我看过一些相关的帖子,但在我的情况下,所有的事情都很好,但当我运行我的Map缩小工作,缩小部分被跳过。我正在努力摆脱这个问题很多。任何人请找出问题,并在得到工作运行的帮助。

Map任务:

package org.netflix.rating;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NetflixAvgRatingMap extends Mapper<LongWritable,Text,LongWritable,IntWritable>{

    public void map(LongWritable key,Text value,Context context){
        String NetflixEntrypattern="^(\\s*)([0-9]+)(\\s+)([0-9]+)(\\s+)(\\d{1})(\\s+)(.*)";
        Pattern p = Pattern.compile(NetflixEntrypattern);
        Matcher matcher = p.matcher(value.toString());
        if (!matcher.matches()) {
            return;
    }
    Long movie_id=Long.parseLong(matcher.group(4));
    int rating=Integer.parseInt(matcher.group(6));
    try {
        context.write(new LongWritable(movie_id),new IntWritable(rating));
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    }

}

减少任务:

package org.netflix.rating;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class NetflixAvgRatingReducer extends Reducer<LongWritable,IntWritable,LongWritable,FloatWritable> {
          public void reduce(LongWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
              int sum=0,count=0;
                while(values.iterator().hasNext()){
                sum+=values.iterator().next().get();
                count++;
              }
              float avg=(float)sum/count;

                context.write(key,new FloatWritable(avg));

              }

          }

驾驶员等级:

package org.netflix.rating;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class NetflixAvgRating {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // Create job
        Job job = new Job(conf, "NetflixAvgRating");
        job.setJarByClass(NetflixAvgRating.class);

        // Setup MapReduce job
        job.setMapperClass(NetflixAvgRatingMap.class);
        job.setReducerClass(NetflixAvgRatingReducer.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(FloatWritable.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        // Input
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //job.setOutputFormatClass(TextOutputFormat.class);

        // Execute job
        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);
    }
}

我已经正确设置了所有配置和参数,但是我的reduce任务不再执行了。如有任何建议,将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题