如何为hadoop的map reduce作业设置配置?

5ktev3wc  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(441)

假设我要为mr job设置以下配置:

mapred.map.tasks
mapred.reduce.tasks
mapred.tasktracker.map.tasks.maximum
mapred.tasktracker.reduce.tasks.maximum
mapred.reduce.slowstart.completed.maps

有什么可能的方法让我做这个?
我可以在mapred-site.xml中设置。但这将适用于我从事的所有工作。
如果我想专门为个人工作设置这些,这是否有效:

conf.set("mapred.tasktracker.map.tasks.maximum", 10)

(我在任何地方都没见过这种东西)
或者只通过命令行参数
例如 -D mapred.tasktracker.map.tasks.maximum=10 (这似乎是更常见的用法)

x6h2sr28

x6h2sr281#

这两种方法都是有效的,您可以在开始作业之前以任何方式编辑配置。

yws3nbqq

yws3nbqq2#

解决方案1:创建basejob类:

public abstract class BaseJob extends Configured implements Tool {

// method to set the configuration for the job and the mapper and the reducer classes
protected Job setupJob(Transformation transformation, final Configuration conf) throws Exception {

    //Get the job object from the global configuration
    Job job = new Job(conf);

    //Set the transformation specific details
    if(transformation.getMapperClass() != null)
    job.setMapperClass(transformation.getMapperClass());

    if(transformation.getReducerClass() != null)
    job.setReducerClass(transformation.getReducerClass());

    if(transformation.getMapOutputKeyClass() != null)
    job.setMapOutputKeyClass(transformation.getMapOutputKeyClass());

    if(transformation.getMapOutputValueClass() != null)
    job.setMapOutputValueClass(transformation.getMapOutputValueClass());

    if(transformation.getPartitionerClass() != null)
    job.setPartitionerClass(transformation.getPartitionerClass());

    if(transformation.getSortComparatorClass() != null)
    job.setSortComparatorClass(transformation.getSortComparatorClass());

    if(transformation.getGroupingComparator() != null)
    job.setGroupingComparatorClass(transformation.getGroupingComparator());

    if(transformation.getInputFormatClass() != null)
    job.setInputFormatClass(transformation.getInputFormatClass());

    if(transformation.getOutputKeyClass() != null)
    job.setOutputKeyClass(transformation.getOutputKeyClass());

    if(transformation.getOutputValueClass() != null)
    job.setOutputValueClass(transformation.getOutputValueClass());

    if(transformation.getJarByClass() != null)
    job.setJarByClass(transformation.getJarByClass());

    return job;
}

protected abstract class Transformation {
    public abstract Class<?> getJarByClass();
    public abstract Class<? extends Mapper> getMapperClass();
    public abstract Class<? extends Reducer> getCombinerClass();
    public abstract Class<? extends Reducer> getReducerClass();
    public abstract Class<?> getOutputKeyClass();
    public abstract Class<?> getOutputValueClass();
    public abstract Class<?> getMapOutputKeyClass();
    public abstract Class<?> getMapOutputValueClass();
    public abstract Class<? extends Partitioner> getPartitionerClass();
    public abstract Class<? extends WritableComparator> getSortComparatorClass();
    public abstract Class<? extends WritableComparator> getGroupingComparator();
    public abstract Class<? extends InputFormat<?,?>> getInputFormatClass();
    public abstract Class<? extends OutputFormat<?,?>> getOutputFormatClass();
}

}
然后编写mytransformationjob类并设置配置

public class MyTransformationJob extends BaseJob {

           private Job getJobConf(final Configuration conf) throws Exception {

        Transformation tranformation = new Transformation() {
            @Override
            public Class<? extends Reducer> getCombinerClass() {
                return null;
            }

            @Override
            public Class<?> getJarByClass() {
                return MyTransformationJob .class;
            }

            @Override
            public Class<? extends Mapper> getMapperClass() {
                return MyMapper.class;
            }

            @Override
            public Class<?> getOutputKeyClass() {
                return Text.class;
            }

            @Override
            public Class<?> getOutputValueClass() {
                return NullWritable.class;
            }

            @Override
            public Class<? extends Reducer> getReducerClass() {

                if(StringUtils.equals(jobParams[3], "header")){
                    return HeaderReducer.class;
                }
                return ValuesReducer.class;

            }

            @Override
            public Class<?> getMapOutputKeyClass() {
                return Text.class;
            }

            @Override
            public Class<?> getMapOutputValueClass() {
                return LinkedMapWritable.class;
            }

            @Override
            public Class<? extends Partitioner> getPartitionerClass() {
                return StationKeyPartitioner.class;
            }

            @Override
            public Class<? extends WritableComparator> getSortComparatorClass() {
                return StationKeySortComparator.class;
            }

            @Override
            public Class<? extends WritableComparator> getGroupingComparator() {
                return UniqueIdGroupingComparator.class;
            }

            @Override
            public Class<? extends InputFormat<?,?>> getInputFormatClass() {
                return KeyValueTextInputFormat.class;
            }

            @Override
            public Class<? extends OutputFormat<?,?>> getOutputFormatClass() {
                return null;
            }

        };

        return setupJob(tranformation,conf);

    }  

 }

通过这种方式,可以使用不同的配置和类指定多个作业。
解决方案2:
您可以创建本地配置并指定您提到的值
样品测试等级:

public class ConfigurationTest extends TestCase {

  @Test
  public void test() throws IOException {

    Configuration conf = new Configuration();
    conf.addResource("hadoop-local.xml");

    assertThat(conf.get("mapred.reduce.tasks"), is("2"));
}

}

相关问题