java—将值传输到还原程序的清除函数不起作用

zaq34kh6  于 2021-05-27  发布在  Hadoop
关注(0)|答案(3)|浏览(328)

我不明白mapreduce的工作给了我什么输出。我有一个 .csv 文件作为输入,其中存储了一个城市的地区,每个地区的每棵树的树龄。
在combiner中,我尝试获取每个地区最古老的树,而在reducer中,我尝试检索城市中具有最古老树的地区。
我的问题是 reduce function给我11、12、16和5的输出值 cleanup 在reducer中应该返回最后一个值(5)的函数实际上返回9(这是我的reducer分析的最后一个值)。
我不明白我错过了什么。
下面是我到目前为止所做的尝试。
Map器:

package com.opstty.mapper;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text result = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString(),";");
        int i = 0;
        while (itr.hasMoreTokens()) {
            String arrondissement = itr.nextToken();
            if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {

                itr.nextToken();itr.nextToken();itr.nextToken();
                String annee = itr.nextToken();
                result.set(arrondissement);

                if(Double.parseDouble((String.valueOf(annee))) > 1000){
                    context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
                    i+=3;
                }
            }
            i++;
        }
    }
}

合路器:

package com.opstty.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List a = new ArrayList();
        int sum = 0;
        for (IntWritable val : values) {
            a.add(val.get());
        }
        Collections.sort(a);
        result.set((Integer) Collections.min(a));
        context.write(key, result);
    }
}

减速器:

public class IntReducer6 extends Reducer<Text, IntWritable, Text, NullWritable> {
    private int max = 100000;
    private int annee=0;
    int i =0;
    private  List a = new ArrayList();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        for (IntWritable value : values)
        {
            annee = value.get();
        }

        if(annee < max)
        {
            a.add(key);
            max = annee;
            context.write((Text) a.get(i), NullWritable.get());
            i++;
        }
    }

    @Override
    // only display the character which has the largest value
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write((Text) a.get(a.size()-1), NullWritable.get());
    }
}
jyztefdp

jyztefdp1#

很好用,谢谢!
我之所以使用组合器,是因为我的老师提出了这个问题:
编写一个mapreduce作业,显示最古老的树所在的地区。Map绘制者必须提取每棵树的年龄和地区。问题是,这些信息不能用作键和值(为什么?)。您需要定义一个子类writable来包含这两个信息。reducer应该合并所有这些数据,并且只输出区域。
我对map/reduce完全是新手,我没有真正了解writable的子类是什么。所以我在网上搜索了一些关于combiner和writeablecompare的主题。

6yt4nkrj

6yt4nkrj2#

你的方法 reduce (1)合路器- reduce )就mapreduce作业而言,对于这样一个简单的类型,方法有点过分了。老实说,这类任务似乎根本不需要合并器,当然也不能对reducer使用cleanup函数,因为它是为每个reducer执行的。
你的程序的主要问题是没有考虑程序的运行 reduce 函数,因为后者随后通过多个示例执行 key 价值,或者用更简单的术语 reduce 为每个 key 分开。这意味着你的工作类型 reduce 函数只需要执行一次(对于所有“键”,我们将在下面看到结果),以便找到具有最古老树的区域。
考虑到这一点 map 函数应该排列输入的每一行的数据 .csv 以这样的方式归档 key 每个的 key-value 对中的每一对都是相同的(以便 reduce 函数对所有行进行操作),每对的值包含地区名称和每棵树的树龄。所以Map绘制者会生成 key-value 成对的是 NULL 值将是所有这些树的键,每个值都将是一个复合值,其中存储了地区名称和特定树龄,如下所示: <NULL, (district, tree_age)> 至于 reduce 函数,它只需要根据 NULL 键(又名所有对)并找到最大树龄。然后,最终输出 key-value 两人将展示这个地区最古老的树和最长的树龄,就像这样: <district_with_oldest_tree, max_tree_age> 为了展示我测试过的答案,我自由地简化了你的程序,主要是因为法语(?)命名的变量让我有点困惑,而且你通过使用严格的hadoop友好的数据结构,比如 StringTokenizer 当最近的hadoop版本支持更常见的java数据类型时。
首先,因为我没看过你的意见 .csv 文件,我创建了我的 trees.csv 存储在名为 trees 其中包含以下行,并包含用于地区和树龄的列:

District A; 7
District B; 20
District C; 10
District C; 1
District B; 17
District A; 6
District A; 11
District B; 18
District C; 2

在我的(为了简单起见都放在一个文件中)程序中 @ 字符用作分隔符来分隔Map程序生成的复合键上的数据,结果存储在名为 oldest_tree . 你可以根据自己的需要改变这个 .csv 输入文件。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class OldestTree
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <NULL, (district, tree_age)>
     */
    public static class Map extends Mapper<Object, Text, NullWritable, Text> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String row = value.toString();

            String[] columns = row.split("; ");     // split each row by the delimiter
            String district_name = columns[0];
            String tree_age = columns[1];

            // set NULL as key for the generated key-value pairs aimed at the reducers
            // and set the district with each of its trees age as a composite value,
            // with the '@' character as a delimiter
            context.write(NullWritable.get(), new Text(district_name + '@' + tree_age));
        }
    }

    /* input: <NULL, (district, tree_age)>
     * output: <district_with_oldest_tree, max_tree_age>
     */
    public static class Reduce extends Reducer<NullWritable, Text, Text, IntWritable>
    {
        public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            String district_with_oldest_tree = "";
            int max_tree_age = 0;

            // for all the values with the same (NULL) key,
            // aka all the key-value pairs...
            for(Text value : values)
            {
                // split the composite value by the '@' delimiter
                String[] splitted_values = value.toString().split("@");
                String district_name = splitted_values[0];
                int tree_age = Integer.parseInt(splitted_values[1]);

                // find the district with the oldest tree
                if(tree_age > max_tree_age)
                {
                    district_with_oldest_tree = district_name;
                    max_tree_age = tree_age;
                }
            }

            // output the district (key) with the oldest tree's year of planting (value)
            // to the output directory
            context.write(new Text(district_with_oldest_tree), new IntWritable(max_tree_age));
        }
    }

    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("trees");
        Path output_dir = new Path("oldest_tree");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job oldesttree_job = Job.getInstance(conf, "Oldest Tree");
        oldesttree_job.setJarByClass(OldestTree.class);
        oldesttree_job.setMapperClass(Map.class);
        oldesttree_job.setReducerClass(Reduce.class);    
        oldesttree_job.setMapOutputKeyClass(NullWritable.class);
        oldesttree_job.setMapOutputValueClass(Text.class);
        oldesttree_job.setOutputKeyClass(Text.class);
        oldesttree_job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(oldesttree_job, input_dir);
        FileOutputFormat.setOutputPath(oldesttree_job, output_dir);
        oldesttree_job.waitForCompletion(true);
    }
}

所以程序的结果存储在 oldest_tree 目录(通过hadoop hdfs浏览器可以看到)是:

x9ybnkn6

x9ybnkn63#

谢谢你的帮助@当然,这是我的Map:

package com.opstty.mapper;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text result = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString(),";");
        int i = 0;
        while (itr.hasMoreTokens()) {
            String arrondissement = itr.nextToken();
            if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {

                itr.nextToken();itr.nextToken();itr.nextToken();
                String annee = itr.nextToken();
                result.set(arrondissement);

                if(Double.parseDouble((String.valueOf(annee))) > 1000){
                    context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
                    i+=3;
                }
            }
            i++;
        }
    }
}

还有我的组合器:

package com.opstty.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List a = new ArrayList();
        int sum = 0;
        for (IntWritable val : values) {
            a.add(val.get());
        }
        Collections.sort(a);
        result.set((Integer) Collections.min(a));
        context.write(key, result);
    }
}

目标是我有一个csv文件。在Map上我得到了一个城市里每棵树的地区和树龄。在我的合并器中,我得到了每个地区最古老的树,在我的reduce中,我想打印出这个地区实际上有这个城市最古老的树

相关问题