partitioner似乎不能在单节点上工作?

ruoxqz4g  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(384)

我已经编写了map reduce代码以及自定义分区。自定义分区使用某些条件对键进行排序。我在驱动程序类中设置了setnumreducetasks=6。但是我在我的单机上测试这段代码,我只得到一个reducer输出文件,没有6个reducer文件。分区器不能在单机上工作吗?是否需要多节点集群来观察自定义分区器的效果?对此有任何见解都将不胜感激。

kse8i1jr

kse8i1jr1#

我在一台机器上有一个两节点的集群。我就是这么做的。从那里你可以看到我这样做(在执行时):
指定减速器的数量,例如两个

-D mapred.reduce.tasks=2
am46iovg

am46iovg2#

当您将reducer的no设置为大于1时,partitioner总是工作的,即使它是一个单节点集群。
我已经在单节点集群上测试了以下代码,它可以正常工作:

public final class SortMapReduce extends Configured implements Tool {

public static void main(final String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new SortMapReduce(), args);
    System.exit(res);
}

public int run(final String[] args) throws Exception {

    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration conf = super.getConf();

    Job job = Job.getInstance(conf);

    job.setJarByClass(SortMapReduce.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(KeyValueTextInputFormat.class);

    job.setMapOutputKeyClass(Person.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setPartitionerClass(PersonNamePartitioner.class);

    job.setNumReduceTasks(5);

    FileInputFormat.setInputPaths(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    if (job.waitForCompletion(true)) {
        return 0;
    }
    return 1;
}

public static class Map extends Mapper<Text, Text, Person, Text> {

    private Person outputKey = new Person();

    @Override
    protected void map(Text pointID, Text firstName, Context context) throws IOException, InterruptedException {
        outputKey.set(pointID.toString(), firstName.toString());
        context.write(outputKey, firstName);
    }
}

public static class Reduce extends Reducer<Person, Text, Text, Text> {

    Text pointID = new Text();

    @Override
    public void reduce(Person key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        pointID.set(key.getpointID());
        for (Text firstName : values) {
            context.write(pointID, firstName);
        }
    }
}

}
分区器类:

public class PersonNamePartitioner extends Partitioner<Person, Text> {

@Override
public int getPartition(Person key, Text value, int numPartitions) {

    return Math.abs(key.getpointID().hashCode() * 127) % numPartitions;
}

}
运行命令:
hadoop jar/home/hdfs/secondarysort.jar org.test.sortmapreduce/demo/data/customer/acct.txt/demo/data/customer/output2
谢谢,

e5njpo68

e5njpo683#

好好看看你的自定义分区器。它可以为传递给它的所有键返回相同的分区值。
在这种情况下,它是一个低效的分区器,它将所有键发送到同一个缩减器。因此,即使您将reducer的数量设置为6,也只有一个reducer具有所有键值,其余5个reducer将没有任何要处理的内容。
因此,您将拥有处理所有记录的唯一减速机的输出。
分区器不能在一台机器上工作吗?分区器也可以在单机伪集群中工作。
是否需要多节点集群来查看自定义分区器的效果?不。

相关问题