ApacheHadoop没有做到这一点,而是在我的程序中合并并减少了它应该做的工作

v7pvogib  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(279)

我是apachehadoop的初学者,尝试过apache的单词计数程序,效果很好。但现在我想做我自己的室外温度程序,计算每日平均温度。平均计算不如我预期的那样有效;不进行数据合并和平均。
更具体地说,这里是sample2.txt输入文件的一部分:

25022016 00:00:00 -10.3
25022016 00:01:00 -10.3
25022016 00:02:00 -10.3
25022016 00:03:00 -10.3
...
25022016 00:59:00 -11.2

我想要的结果应该是:

25022016 7.9

这是当天所有温度观测值的平均值。所以我有60个观察结果,想要一个平均值。今后我想用同一个程序在更多的日子里处理更多的观察结果。1列为日期(文本),2。时间和第三个是温度。温度计算是在java的float数据类型代码中完成的。
现在的结果是:

25022016    -10.3
25022016    -10.3
25022016    -10.3
25022016    -10.3
...
25022016    -11.2

因此,每一个观测值的平均值被计算出来(从一个数字中计算出一个数字的平均值)。我想要60次观察的平均值(一个数字)!
所以我的输入和输出文件在上面。我的java代码(在windows 7->virtualbox->ubuntu 64位上运行)如下:

package hadoop; 

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.commons.cli.Options;

public class ProcessUnits2 
{ 
    public static class E_EMapper extends
    Mapper<Object, Text, Text, FloatWritable>
    { 
        private FloatWritable temperature = new FloatWritable();
        private Text date = new Text();       

        public void map(Object key, Text value, 
        Context context) throws IOException, InterruptedException 
        { 
            StringTokenizer dateTimeTemperatures = new StringTokenizer(value.toString());

            while(dateTimeTemperatures.hasMoreTokens()) {
                date.set(dateTimeTemperatures.nextToken());

                while(dateTimeTemperatures.hasMoreTokens()) {
                    dateTimeTemperatures.nextToken();    
                    temperature.set(Float.parseFloat(dateTimeTemperatures.nextToken()));

                    context.write(date, temperature);
                }
            }
        } 
    } 

    public static class E_EReduce extends Reducer<Text,Text,Text,FloatWritable>
    {
        private FloatWritable result = new FloatWritable();

        public void reduce( Text key, Iterable<FloatWritable> values, Context context
        ) throws IOException, InterruptedException 
        { 
            float sumTemperatures=0, averageTemperature;
            int countTemperatures=0;

            for (FloatWritable val : values) {
                sumTemperatures += val.get();
                countTemperatures++;
            } 

            averageTemperature = sumTemperatures / countTemperatures;

            result.set(averageTemperature);
            context.write(key, result);

        } 
    }  

    public static void main(String args[])throws Exception 
    { 
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "VuorokaudenKeskilampotila");
        job.setJarByClass(ProcessUnits2.class);

        job.setMapperClass(E_EMapper.class);
        job.setCombinerClass(E_EReduce.class);
        job.setReducerClass(E_EReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
        new Path(otherArgs[otherArgs.length - 1]));
        job.setNumReduceTasks(0);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    } 
} 
---------------------------------------------------

hadoop版本是2.7.2和ubuntu 14.04 lts。我在独立模式下运行hadoop(最基本的设置)。
以下是我用来构建程序的命令(如果有用的话):

rm -rf output2 
javac -Xdiags:verbose -classpath hadoop-core-1.2.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d units2 ProcessUnits2.java
jar -cvf units2.jar -C units2/ .
hadoop jar units2.jar   hadoop.ProcessUnits2 input2 output2
cat output2/part-m-00000

作为一个初学者,我很困惑,在我看来hadoop并没有在它的默认设置中做任何合并和减少(=平均值)的工作,这应该是它的最终目的。我承认,我从这里和那里(例子)选择代码,因为没有任何工作,我相信这只是一个小步骤,以解决问题,但我不能猜测它是什么。我可以很容易地用c++做这个例子,而不需要任何Map缩减框架,但问题是我希望基本的操作能够正常工作,这样我就可以继续使用更复杂的例子,并在最终的产品使用和真正的分布式Map相结合的缩减。
我会非常感激任何帮助。我现在困在这里面了(好几个小时…)。如果您需要任何额外的数据来帮助找到解决方案,我会发送给他们。

vwhgwdsa

vwhgwdsa1#

您没有正确实现减速机。应该是:

public static class E_EReduce extends Reducer<Text, FloatWritable, Text, FloatWritable>
{
    @Override
    public void reduce( Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException 
    {

永远不要忘记 @Override ,否则编译器将无法捕获错误。

ikfrs5lh

ikfrs5lh2#

现在我注意到了问题所在:
生产线:

job.setNumReduceTasks(0);

说没有还原剂。我把它改成了 job.setNumReduceTasks(1); 甚至完全删除了它,现在程序运行了。为什么会在那里?=>因为遇到麻烦,你想尽一切办法,却没有时间看文件。
谢谢所有参与的人。我继续研究这个系统。

相关问题