mapreduce,带2个键

ecr0jaav  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(342)

我只是在学习Map缩小的工作。我做了一件事,我的作业,我必须改变我的代码接受另一个文本文件作为输入和输出必须显示与最大,最小,和平均金额年的位置。这是我输入的一行的示例: Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641 输出应该是这样的: Calgary 2009 Average is: Max: Min: 这是我的代码,它给出了txt文件并计算平均值,最小值和最大值:

public class AverageMinMax {

public static class Map extends Mapper<LongWritable,Date,Text,Text> {

    //private static final FloatWritable rep= new  FloatWritable(1);
        public void map(LongWritable key,Text value,Context context)
        throws IOException, InterruptedException {
                context.write(new Text("Map_Output"), value);
        };
    }
      public static class Combiner extends Reducer<Text,Text,Text,Text>
      {
      public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
          {
             Integer NumberOfValues=0;
             double sum=0D;
             double min=0D;
             double max=0D;
             //double min=values.get(0);
              Iterator<Text> itr = values.iterator();
              //convertString=values(0);
              while(itr.hasNext())
              {
                  String TexttoString = itr.next().toString();
                  Double value = Double.parseDouble(TexttoString);
                  if(value<min)
                  {
                      min=value;
                  }
                  if(value>max)
                  {
                      max=value;
                  }
                  NumberOfValues++;
                  sum+=value;
              }
               Double average = sum/NumberOfValues;
                context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
          };
      }
 public static class Reduce extends
       Reducer<Text,Text,Text,Text> {
      public void reduce(Text key, Iterable<Text> values,
        Context context) throws IOException, InterruptedException {
           Integer totalNumberOfValues= 0;
          Double sum=0.00;
          Double min=0D;
          Double max=0D;
          Iterator<Text> itr = values.iterator();
            while(itr.hasNext())
          {
              String TexttoString = itr.next().toString();
              String[] split_String = TexttoString.split(",");
              Double average = Double.parseDouble(split_String[0]);
              Integer NumberOfValues = Integer.parseInt(split_String[1]);
              Double minValue=Double.parseDouble(split_String[2]);
              Double maxValue=Double.parseDouble(split_String[3]);
              if(minValue<min)
              {
                  min=minValue;
              }
              if(maxValue>max)
              {
                  max=maxValue;
              }
              sum+=(average*NumberOfValues);
              totalNumberOfValues+=NumberOfValues;   
          } 
          Double average= sum/totalNumberOfValues;
          context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
          };
     }
     public static void main(String[] args) throws Exception {

         Configuration conf = new Configuration();
         Job job=new Job(conf,"AverageMinMax.class");
         job.setJarByClass(AverageMinMax.class);
         job.setJobName("MapReduceAssignment");
         //JobConf conf = new JobConf(Hadoop_map_reduce.class);

        //conf.setJobName("Hadoop_assignment");
         // Configuration conf = new Configuration();
      //Job job = new Job(conf, "maxmin");
      //job.setJarByClass(Hadoop_map_reduce.class);
     // FileSystem fs = FileSystem.get(conf);
    /*  if (fs.exists(new Path(args[1]))) {
       fs.delete(new Path(args[1]), true);
      }*/
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);

         //job.setNumReduceTasks(1);

         job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);
         job.setCombinerClass(Combiner.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

     FileInputFormat.addInputPath(job, new Path(args[0]));
    //  FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
      //  FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
      job.waitForCompletion(true);
     }

}
所以,我的第一个问题是我不知道如何在Map器中转换日期,以及如何找到2个键并在输出中显示。我是说怎么重写这段代码!
我很感激你的帮助

lmvvr0a8

lmvvr0a81#

你的问题不完全清楚。因此,我的假设如下:
您有一个数据集合,其中显示了位置、日期和一些要处理的双精度值
要处理的值从第一个双精度值(即2.5207754,…)开始。
平均值是每年整个观察的所有列的平均值(i、 e如果2009年有5个样本,每个样本有5个值,则需要25个值的平均值)。
您的最小值和最大值是各年份的整个观测值的最小值和最大值。
如果假设是正确的,我建议您使用jeremy lin教授的定制数据类型。可能的解决办法如下:
你的钥匙将地点和年份组合成文字。

String line = value.toString();
String[] tokens = line.split(",");
String[] date = tokens[2].split("-");
String year = date[0];
String location = tokens[0];

Text locationYear = new Text(location + " " + year);

然后,您的值将是一个arraylistofdoubleswritable,您可以从上面提到的回购中使用它。

ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable()
for(int i = 5; i < tokens.length(); i++)
{
  readings.add(Double.parseDouble(tokens[i]));
}

然后可以将Map器输出作为文本和arraylistofdoubleswritable发出。

context.write(locationYear, readings);

从这里开始,您可以使用array list的collections方法,通过计算(average、min、max)来操纵reducer中的Map器输出。
我希望这有帮助。

sulc1iza

sulc1iza2#

好吧,看来你有很多问题。我马上想到两个问题:
Map器输出键为 'Combiner_Output' . 这行不通。你想要这把钥匙是什么,可能是城市的名字。所以在你的例子中,“卡尔加里”。这很容易使用 value.toString().split(',')[0] (即从拆分后形成的列表中获取第一个元素 value, 字符)。
你的代码根本没有输出城市名称。这是通过做 context.write(new Text(key.toString() + " Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString())); 在你的减速机里,哪里 key 是从上一点开始的城市名称。
关于如何从字符串中提取日期,在java中,请查看以下文章:从字符串中提取日期
一般来说,我建议您从什么是mapreduce开始,它的设计权衡,以及如何在hadoop架构的范围内充分利用它。

相关问题