我对hadoop的mapreduce概念还比较陌生,我尝试用mapreduce实现binning模式,但是没有得到想要的输出。
这是我的宾宁Mapcode:-
public class BinningMapper extends
Mapper<Object, Text, Text, NullWritable> {
private MultipleOutputs<Text, NullWritable> mos = null;
protected void setup(Context context) {
// Create a new MultipleOutputs using the context object
mos = new MultipleOutputs(context);
}
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
.toString());
String rawtags = parsed.get("Tags");
// Tags are delimited by ><. i.e. <tag1><tag2><tag3>
String[] tagTokens = StringEscapeUtils.unescapeHtml(rawtags).split(
"><");
// For each tag
for (String tag : tagTokens) {
// Remove any > or < from the token
String groomed = tag.replaceAll(">|<", "").toLowerCase();
// If this tag is one of the following, write to the named bin
if (groomed.equalsIgnoreCase("html")) {
mos.write("bins", value, NullWritable.get(), "html");
}
if (groomed.equalsIgnoreCase("div")) {
mos.write("bins", value, NullWritable.get(), "div");
}
if (groomed.equalsIgnoreCase("body")) {
mos.write("bins", value, NullWritable.get(), "body");
}
if (groomed.equalsIgnoreCase("head")) {
mos.write("bins", value, NullWritable.get(), "head");
}
}
// Get the body of the post
String post = parsed.get("Body");
// If the post contains the word "hadoop", write it to its own bin
if (post.toLowerCase().contains("hadoop")) {
mos.write("bins", value, NullWritable.get(), "hadoop-post");
}
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
// Close multiple outputs!
mos.close();
}
}
这是我的车夫code:-
公共类binningdriver{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(eng.nyu.cs.BinningDriver.class);
job.setMapperClass(eng.nyu.cs.BinningMapper.class);
// TODO: specify a reducer
job.setReducerClass(Reducer.class);
// TODO: specify output types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Configure the MultipleOutputs by adding an output called "bins"
// With the proper output format and mapper key/value pairs
//MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,
//Text.class, NullWritable.class);
// Enable the counters for the job
// If there are a significant number of different named outputs, this
// should be disabled
MultipleOutputs.setCountersEnabled(job, true);
// Map-only job
job.setNumReduceTasks(0);
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(job, new Path("src"));
FileOutputFormat.setOutputPath(job, new Path("out"));
if (!job.waitForCompletion(true))
return;
}
}
这是我的mrdputilsclass:-
公共类mrdputils{
public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
"p4", "p6" };
// This helper function parses the stackoverflow into a Map for us.
public static Map<String, String> transformXmlToMap(String xml) {
Map<String, String> map = new HashMap<String, String>();
try {
String[] tokens = xml.trim().substring(5, xml.trim().length() - 3)
.split("\"");
for (int i = 0; i < tokens.length - 1; i += 2) {
String key = tokens[i].trim();
String val = tokens[i + 1];
map.put(key.substring(0, key.length() - 1), val);
}
} catch (StringIndexOutOfBoundsException e) {
System.err.println(xml);
}
return map;
}
}
1条答案
按热度按时间pprl5pva1#
代码中没有reducer类
和
这应该是来自驱动程序jar的Map程序jar的引用,而不是驱动程序jar的引用
MRDPUtils
类函数也不正确。