在过去的几天里,我一直在自学hadoop,并试图根据这个网页上给出的信息实现一个基本的bfs算法。我不得不做一些修改和补充来编译代码。我在运行时遇到以下错误,即使花了几个小时调试,我也无法解决这个问题。有人能帮我吗?
错误:
15/05/11 03:04:20 WARN mapred.LocalJobRunner: job_local934121164_0001
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/11 03:04:21 INFO mapreduce.Job: Job job_local934121164_0001 running in uber mode : false
15/05/11 03:04:21 INFO mapreduce.Job: map 0% reduce 0%
这不应该发生在Map器中,也不应该发生在还原器中,我使用的键、值类型与您在下面看到的相同。我认为这里发生的唯一一件事是没有使用Map器类,而是使用默认的Map器类(它发出longwritable键)。我不知道我做错了什么。
searchmapper.java文件
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
public class SearchMapper extends Mapper<Object, Text, Text, Text> {
// Types of the input key, input value and the Context object through which
// the Mapper communicates with the Hadoop framework
public void map(Object key, Text value, Context context, Node inNode)
throws IOException, InterruptedException {
// For each GRAY node, emit each of the adjacent nodes as a new node
// (also GRAY) if the adjacent node is already processed
// and colored BLACK, the reducer retains the color BLACK
// Note that the mapper is not differentiating between BLACK GREY AND WHITE
if (inNode.getColor() == Node.Color.GRAY) {
for (String neighbor : inNode.getEdges()) {
Node adjacentNode = new Node();
// Remember that the current node only has the value the id
// of its neighbour, and not the object itself. Therefore at
// this stage there is no way of knowing and assigning any of
// its other properties. Also remember that the reducer is doing
// the 'clean up' task and not the mapper.
adjacentNode.setId(neighbor);
adjacentNode.setDistance(inNode.getDistance() + 1);
adjacentNode.setColor(Node.Color.GRAY);
adjacentNode.setParent(inNode.getId());
context.write(new Text(adjacentNode.getId()), adjacentNode.getNodeInfo()); // Get nodeinfo returns a Text Object
}
inNode.setColor(Node.Color.BLACK);
}
// Emit the input node, other wise the BLACK color change(if it happens)
// Wont be persistent
context.write(new Text(inNode.getId()), inNode.getNodeInfo());
}
}
searchreducer.java文件
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class SearchReducer extends Reducer<Text, Text, Text, Text> {
// Types of the input key, the values associated with the key, the Context object for Reducers communication
// with the Hadoop framework and the node whose information has to be output
// the return type is a Node
public Node reduce(Text key, Iterable<Text> values, Context context, Node outNode)
throws IOException, InterruptedException {
// set the node id as the key
outNode.setId(key.toString());
// TODO : (huh?) Since the values are of the type Iterable, iterate through the values associated with the key
// for all the values corresponding to a particular node id
for (Text value : values) {
Node inNode = new Node(key.toString() + "\t" + value.toString());
// Emit one node after combining all the mapper outputs
// Only one node(the original) will have a non-null adjascency list
if (inNode.getEdges().size() > 0) {
outNode.setEdges(inNode.getEdges());
}
// Save the minimum distance and parent
if (inNode.getDistance() < outNode.getDistance()) {
outNode.setDistance(inNode.getDistance());
outNode.setParent(inNode.getParent());
}
// Save the darkest color
if (inNode.getColor().ordinal() > outNode.getColor().ordinal()) {
outNode.setColor(inNode.getColor());
}
}
context.write(key, new Text(outNode.getNodeInfo()));
return outNode;
}
}
basejob.java(网站上提到的一个通用类,它基本上设置了作业)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public abstract class BaseJob extends Configured implements Tool {
protected Job setupJob(String jobName,JobInfo jobInfo) throws Exception {
Job job = new Job(new Configuration(), jobName);
job.setJarByClass(jobInfo.getJarByClass());
job.setMapperClass(jobInfo.getMapperClass());
if (jobInfo.getCombinerClass() != null)
job.setCombinerClass(jobInfo.getCombinerClass());
job.setReducerClass(jobInfo.getReducerClass());
// TODO : set number of reducers as required
job.setNumReduceTasks(3);
job.setOutputKeyClass(jobInfo.getOutputKeyClass());
job.setOutputValueClass(jobInfo.getOutputValueClass());
/*
job.setJarByClass(SSSPJob.class);
job.setMapperClass(SearchMapper.class);
job.setReducerClass(SearchReducer.class);
job.setNumReduceTasks(3);
job.setOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);*/
return job;
}
// Implement an abstract class for JobInfo object
protected abstract class JobInfo {
public abstract Class<?> getJarByClass();
public abstract Class<? extends Mapper> getMapperClass();
public abstract Class<? extends Reducer> getCombinerClass();
public abstract Class<? extends Reducer> getReducerClass();
public abstract Class<?> getOutputKeyClass();
public abstract Class<?> getOutputValueClass();
}
}
ssspjob.java(驱动程序)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
public class SSSPJob extends BaseJob {
// method to set the configuration for the job and the mapper and the reducer classes
private Job getJobConf(String[] args)
throws Exception {
// Defining the abstract class objects
JobInfo jobInfo = new JobInfo() {
@Override
public Class<? extends Reducer> getCombinerClass() {
return null;
}
@Override
public Class<?> getJarByClass() {
return SSSPJob.class;
}
@Override
public Class<? extends Mapper> getMapperClass() {
return SearchMapper.class;
}
@Override
public Class<?> getOutputKeyClass() {
return Text.class;
}
@Override
public Class<?> getOutputValueClass() {
return Text.class;
}
@Override
public Class<? extends Reducer> getReducerClass() {
return SearchReducer.class;
}
};
return setupJob("ssspjob", jobInfo);
}
// the driver to execute the job and invoke the map/reduce functions
public int run(String[] args) throws Exception {
int iterationCount = 0;
Job job;
// No of grey nodes
long terminationValue = 1;
while( terminationValue >0){
job = getJobConf(args);
String input, output;
// Setting the input file and output file for each iteration
// During the first time the user-specified file will be the
// input whereas for the subsequent iterations
// the output of the previous iteration will be the input
// NOTE: Please be clear of how the input output files are set
// before proceding.
// for the first iteration the input will be the first input argument
if (iterationCount == 0)
input = args[0];
else
// for the remaining iterations, the input will be the output of the previous iteration
input = args[1] + iterationCount;
output = args[1] + (iterationCount + 1);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
Counters jobCntrs = job.getCounters();
terminationValue = jobCntrs.findCounter(MoreIterations.numberOfIterations).getValue();
// if the counter's value is incremented in the reducer(s), then there are more
// GRAY nodes to process implying that the iteration has to be continued.
iterationCount++;
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SSSPJob(), args);
if(args.length != 2){
System.err.println("Usage: <in> <output name> ");
System.exit(1);
System.out.println("Huh?");
}
System.exit(res);
}
}
而且,我也不确定hadoop是如何调试的。我所有的debug print语句似乎都没有任何效果,我怀疑hadoop框架会将日志消息写入其他位置或文件。谢谢:)
1条答案
按热度按时间pkwftd7m1#
mr作业中的key应该实现writeablecomparable,value应该实现writeable。我认为您的Map程序代码使用的是“object”类型的示例。只需在Map之前添加@override注解,并减少方法,以便它们显示错误。否则,您不会看到任何错误,但由于签名不匹配,因此会调用默认的identitymapper,从而导致错误。如果您正在处理一个文本文件,那么map方法的key应该是longwritable,如果您想使用一个自定义key,那么它应该实现writable。