无法在hadoop的wordcount作业中配置缩减器的数量

5hcedyr0  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(313)

我在linum机器中使用单节点集群hadoop-2.7.0。我的wordcount作业代码在1个reducer中运行良好。但如果我增加减速器,效果就不好了。它显示以下错误:

15/05/25 21:15:10 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/05/25 21:15:10 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/05/25 21:15:10 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/05/25 21:15:10 WARN snappy.LoadSnappy: Snappy native library is available
15/05/25 21:15:10 INFO snappy.LoadSnappy: Snappy native library loaded
15/05/25 21:15:10 INFO mapred.FileInputFormat: Total input paths to process : 1
15/05/25 21:15:10 INFO mapred.JobClient: Running job: job_local_0001
15/05/25 21:15:11 INFO util.ProcessTree: setsid exited with exit code 0
15/05/25 21:15:11 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@5f1fd699
15/05/25 21:15:11 INFO mapred.MapTask: numReduceTasks: 1
15/05/25 21:15:11 INFO mapred.MapTask: io.sort.mb = 100
15/05/25 21:15:11 INFO mapred.MapTask: data buffer = 79691776/99614720
15/05/25 21:15:11 INFO mapred.MapTask: record buffer = 262144/327680

15/05/25 21:15:11 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for am (1)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:592)
    at WordMapper.map(WordMapper.java:24)
    at WordMapper.map(WordMapper.java:1)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

我的getpartition方法如下所示:

public int getPartition(Text key, IntWritable value, int numRedTasks) {
        String s = key.toString();
        if(s.length() == 1)
        {
            return 0;
        }
        else if(s.length() == 2)
        {
            return 1;
        }
        else if(s.length() == 3)
        {
            return 2;
        }
        else
            return 3;
    }

在wordcount.class文件中运行方法:

if(input.length < 2)
    {
        System.out.println("Please provide valid input");
        return -1;
    }
    else
    {
        JobConf config = new JobConf();
        FileInputFormat.setInputPaths(config, new Path(input[0]));
        FileOutputFormat.setOutputPath(config, new Path(input[1]));
        config.setMapperClass(WordMapper.class);
        config.setReducerClass(WordReducer.class);
        config.setNumReduceTasks(4);
        config.setPartitionerClass(MyPartitioner.class);
        config.setMapOutputKeyClass(Text.class);
        config.setMapOutputValueClass(IntWritable.class);
        config.setOutputKeyClass(Text.class);
        config.setOutputValueClass(IntWritable.class);
        JobClient.runJob(config);
    }
return 0;

}
我的mapper和reducer代码很好,因为带有1个reducer的wordcount作业运行良好。有人能弄明白吗?

vngu2lb8

vngu2lb81#

这可能是由于清管器在操作中由于高默认值而失败,因此可以在其中设置并行。
谢谢,谢里什。

ux6nzvsh

ux6nzvsh2#

您需要在驱动程序类中使用toorunner,并在主类中调用toolrunner。您可以通过将combiner用作工作流的一部分来实现这一点。下面是驱动程序类代码:正如您从下面的代码中看到的,除了mapper和reducer调用之外,还有一个combiner调用。主运行程序中的退出代码是“int exitcode=toolrunner.run(new configuration(),new wordcountwithcombiner(),args);”它在运行时调用tool runner,您可以通过在运行wordcount程序时使用“-d”选项指定要使用的缩减器或Map器的数量。示例命令行类似于“-d mapred.reduce.tasks=2 input-output”

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
public class WordCountWithCombiner extends Configured
implements Tool{

@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf(); 

Job job = new Job(conf, "MyJob");

job.setJarByClass(WordCount.class);
job.setJobName("Word Count With Combiners");

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

    return job.waitForCompletion(true) ? 0 : 1;

   }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new Configuration(), new WordCountWithCombiner(), args);
    System.exit(exitCode);
}

}

相关问题