hadoop:redcuer没有发出正确的claculation

qybjjes1  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(403)

我有下面的reducer类(mapreduce作业的一部分),它应该计算 score = POS /(-1*sum(NEGs)) .
哪里 POS 是一个正数,并且 NEGs 是两个负数。总是这样。
例如,如果来自Map器的输入是:

<A, A>  -15.0
<A, A>  2.0
<A, A>  -15.0

预期产出将是:

<A, A>  0.06666666666666667

但是,它正在输出 infinity 对于每个输出记录!

<A, A>  Infinity

调试时,如果我添加语句以在while循环中发出值:

score.set(val);
context.write(key, score);

,它可以很好地打印结果,但会重复分割。所以我得到了以下结论:

<A, A>  -15.0
<A, A>  2.0
<A, A>  -15.0
<A, A>  0.06666666666666667   # correct calculation (2/30)
<A, A>  0.0022222222222222222 # Not sure why it divids twice by 30 (2/30/30)!!

这是 MyReducer

private static class MyReducer extends
        Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
    private DoubleWritable score = new DoubleWritable();
    int counter = 0;

    @Override
    public void reduce(Pair key, Iterable<DoubleWritable> values, Context context)
            throws IOException, InterruptedException {
        Iterator<DoubleWritable> iter = values.iterator();
        double nor = 0.0;
        double don = 0.0;

        double val;
        while (iter.hasNext()) {
            val = iter.next().get();
            if (val < 0)
                don += val*-1;
            else
                nor = val;
            //uncomment for debugging!
            //score.set(val);
            //context.write(key, score);
        }

        score.set(nor / don);
        context.write(key, score);
    }

有人能解释为什么吗
如果我在while循环中没有发射任何东西,则发射无穷大
除以分母两次?
谢谢!

7lrncoxx

7lrncoxx1#

当然,double在java中表现的滑稽绝非罕见,但在这个特殊的例子中,double的方式并不奇怪,因为它们在hadoop中的兼容性如何。
首先也是最重要的是,这种类型的reduce计算非常关键,只能在作业的reduce阶段使用,而不能在combine阶段(如果有)使用。如果您已将此reduce计算设置为也作为组合器实现,则可以考虑取消设置此设置。这并不是一个经验法则,但是在mapreduce作业中有很多错误,人们不能很好地理解为什么Reducer会得到奇怪的数据或者连续执行两次计算(就像你指出的那样)。
然而,这个问题的罪魁祸首可能是,为了获得安全的双类型划分,您确实需要使用类型转换来获得正确的双类型结果。
为了展示这一点,我使用了一个基于输入数据并存储在 \input 目录。每个唯一键都有一个正数和两个负数作为值(这里的键设置为 String 为了简单起见),如下所示:

Α -15.0
Α 2.0
Α -15.0
Β -10.0
Β 9.0
Β -12.0
C -7.0
C 1.0
C -19.0
D -5.0
D 18.0
D -5.0
E -6.0
E 6.0
E -6.0

然后使用显式类型转换来计算每个分数,如下面的代码所示:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class ScoreComp 
{
    /* input:  <Character, Number>
     * output: <Character, Number>
     */
    public static class Map extends Mapper<Object, Text, Text, DoubleWritable> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String record = value.toString();
            String[] parts = record.split(" "); // just split the lines into key and value

            // create key-value pairs from each line
            context.write(new Text(parts[0]), new DoubleWritable(Double.parseDouble(parts[1])));
        }
    }

    /* input:  <Character, Number>
     * output: <Character, Score>
     */
    public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
    {
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException 
        {
            double pos = 0.0;
            double neg = 0.0;

            // for every value of a unique key...
            for(DoubleWritable value : values)
            {
                // retrieve the positive number and calculate the sum of the two negative numbers
                if(value.get() < 0)
                    neg += value.get();
                else
                    pos = value.get();
            }

            // calculate the score based on the values of each key (using explicit type casting)
            double result = (double) pos / (-1 * neg);

            // create key-value pairs for each key with its score
            context.write(key, new DoubleWritable(result));
        }
    }

    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("scores");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job scorecomp_job = Job.getInstance(conf, "Score Computation");
        scorecomp_job.setJarByClass(ScoreComp.class);
        scorecomp_job.setMapperClass(Map.class);
        scorecomp_job.setReducerClass(Reduce.class);    
        scorecomp_job.setMapOutputKeyClass(Text.class);
        scorecomp_job.setMapOutputValueClass(DoubleWritable.class);
        scorecomp_job.setOutputKeyClass(Text.class);
        scorecomp_job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(scorecomp_job, input_dir);
        FileOutputFormat.setOutputPath(scorecomp_job, output_dir);
        scorecomp_job.waitForCompletion(true);
    }
}

您可以在 /scores 目录在数学上是有意义的(通过hdfs浏览器截图):

相关问题