binning模式hadoopmapreduce

ss2ws0br  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(481)

我对hadoop的mapreduce概念还比较陌生,我尝试用mapreduce实现binning模式,但是没有得到想要的输出。
这是我的宾宁Mapcode:-

public class BinningMapper extends
  Mapper<Object, Text, Text, NullWritable> {
    private MultipleOutputs<Text, NullWritable> mos = null;
    protected void setup(Context context) {
        // Create a new MultipleOutputs using the context object
        mos = new MultipleOutputs(context);
    }
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
    Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
    .toString());
    String rawtags = parsed.get("Tags");
    // Tags are delimited by ><. i.e. <tag1><tag2><tag3>
    String[] tagTokens = StringEscapeUtils.unescapeHtml(rawtags).split(
    "><");
    // For each tag
    for (String tag : tagTokens) {
    // Remove any > or < from the token
    String groomed = tag.replaceAll(">|<", "").toLowerCase();
    // If this tag is one of the following, write to the named bin
    if (groomed.equalsIgnoreCase("html")) {

    mos.write("bins", value, NullWritable.get(), "html");
}
    if (groomed.equalsIgnoreCase("div")) {
        mos.write("bins", value, NullWritable.get(), "div");
     }
    if (groomed.equalsIgnoreCase("body")) {
        mos.write("bins", value, NullWritable.get(), "body");
     }
    if (groomed.equalsIgnoreCase("head")) {
        mos.write("bins", value, NullWritable.get(), "head");
    }
}
    // Get the body of the post
    String post = parsed.get("Body");
    // If the post contains the word "hadoop", write it to its own bin
    if (post.toLowerCase().contains("hadoop")) {
        mos.write("bins", value, NullWritable.get(), "hadoop-post");
    }
}
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
    // Close multiple outputs!
    mos.close();
}

}
这是我的车夫code:-
公共类binningdriver{

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "JobName");
    job.setJarByClass(eng.nyu.cs.BinningDriver.class);
    job.setMapperClass(eng.nyu.cs.BinningMapper.class);

    // TODO: specify a reducer
    job.setReducerClass(Reducer.class);

    // TODO: specify output types
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    // Configure the MultipleOutputs by adding an output called "bins"
    // With the proper output format and mapper key/value pairs
    //MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,
    //Text.class, NullWritable.class);
    // Enable the counters for the job
    // If there are a significant number of different named outputs, this
    // should be disabled
    MultipleOutputs.setCountersEnabled(job, true);
    // Map-only job
    job.setNumReduceTasks(0);

    // TODO: specify input and output DIRECTORIES (not files)
    FileInputFormat.setInputPaths(job, new Path("src"));
    FileOutputFormat.setOutputPath(job, new Path("out"));

    if (!job.waitForCompletion(true))
        return;
}

}
这是我的mrdputilsclass:-
公共类mrdputils{

public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
        "p4", "p6" };

// This helper function parses the stackoverflow into a Map for us.
public static Map<String, String> transformXmlToMap(String xml) {
    Map<String, String> map = new HashMap<String, String>();
    try {
        String[] tokens = xml.trim().substring(5, xml.trim().length() - 3)
                .split("\"");

        for (int i = 0; i < tokens.length - 1; i += 2) {
            String key = tokens[i].trim();
            String val = tokens[i + 1];

            map.put(key.substring(0, key.length() - 1), val);
        }
    } catch (StringIndexOutOfBoundsException e) {
        System.err.println(xml);
    }

    return map;
}

}

pprl5pva

pprl5pva1#

job.setReducerClass(Reducer.class);

代码中没有reducer类

job.setJarByClass(eng.nyu.cs.BinningDriver.class);

这应该是来自驱动程序jar的Map程序jar的引用,而不是驱动程序jar的引用 MRDPUtils 类函数也不正确。

相关问题