mapreduce未完成时溢出,reducer卡在第二个作业中

rjee0c15  于 2021-05-30  发布在  Hadoop
关注(0)|答案(0)|浏览(227)

我尝试在ubuntu单节点集群中以伪分布式模式运行两个链式mapreduce作业。第一项工作圆满完成,取得了预期的效果。当第二个作业运行时,它显示两次“开始刷新map输出”消息。然后map任务得到声明,显示100%减少0%。而且泄漏还没有结束。整个任务在这一点上被卡住了。

第二个作业卡住的控制台日志:

14/12/18 17:29:04 INFO input.FileInputFormat: Total input paths to process : 1
14/12/18 17:29:04 INFO mapred.JobClient: Running job: job_local1934661043_0002
14/12/18 17:29:04 INFO mapred.LocalJobRunner: Waiting for map tasks
14/12/18 17:29:04 INFO mapred.LocalJobRunner: Starting task: attempt_local1934661043_0002_m_000000_0
14/12/18 17:29:04 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@e1ef17
14/12/18 17:29:04 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/user/mahesh/outpu/part-r-00000:0+51
14/12/18 17:29:04 INFO mapred.MapTask: io.sort.mb = 100
14/12/18 17:29:04 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/18 17:29:04 INFO mapred.MapTask: record buffer = 262144/327680
14/12/18 17:29:04 INFO mapred.MapTask: Starting flush of map output
14/12/18 17:29:04 INFO mapred.MapTask: Starting flush of map output
14/12/18 17:29:05 INFO mapred.JobClient:  map 0% reduce 0%
14/12/18 17:29:10 INFO mapred.LocalJobRunner: 
14/12/18 17:29:11 INFO mapred.JobClient:  map 100% reduce 0%

查找驱动程序类以检查如何配置作业:

public class NetflixAvgRatingSorted extends Configured implements Tool{

     private static final String OUTPUT_PATH = "/user/mahesh/outpu";

    public int run(String[] args) throws Exception {
          /*
           * Job 1
           */
          Configuration conf = getConf();

          FileSystem fs = FileSystem.get(conf);
          Job job = new Job(conf, "NetflixAvgRating");
          job.setJarByClass(NetflixAvgRatingSorted.class);

          job.setMapperClass(NetflixAvgRatingMap.class);
          job.setMapOutputValueClass(IntWritable.class);
          job.setReducerClass(NetflixAvgRatingReducer.class);

          job.setOutputKeyClass(LongWritable.class);
          job.setOutputValueClass(FloatWritable.class);

         /* job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);*/

          FileInputFormat.addInputPath(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

          job.waitForCompletion(true);

          /*
           * Job 2
           */
          Configuration conf2 = getConf();

          Job job2 = new Job(conf2, "NetflixAvgRatingSorted");
          job2.setJarByClass(NetflixAvgRatingSorted.class);

          job2.setMapperClass(NetflixAvgRatingSortedMapper.class);
          job2.setReducerClass(NetflixAvgRatingSortedReducer.class);

          job2.setOutputKeyClass(CompositekeyWritable.class);
          job2.setOutputValueClass(NullWritable.class);
          job2.setMapOutputKeyClass(CompositekeyWritable.class);
          job2.setMapOutputValueClass(NullWritable.class);

          /*job2.setInputFormatClass(TextInputFormat.class);
          job2.setOutputFormatClass(TextOutputFormat.class);*/

          FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH+"/part-r-00000"));
          FileOutputFormat.setOutputPath(job2, new Path(args[1]));

          return job2.waitForCompletion(true) ? 0 : 1;
         }
    public static void main(String[] args) throws Exception {
          // TODO Auto-generated method stub
          if (args.length != 2) {
           System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
           System.exit(0);
          }
          ToolRunner.run(new Configuration(), new NetflixAvgRatingSorted(), args);
         }

}

如果有人在配置设置或相关内容中发现错误,请通知。如有任何建议,将不胜感激。提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题