mapreduce中的java调试没有调用Map器?

aiazj4mn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(355)

在过去的几天里,我一直在自学hadoop,并试图根据这个网页上给出的信息实现一个基本的bfs算法。我不得不做一些修改和补充来编译代码。我在运行时遇到以下错误,即使花了几个小时调试,我也无法解决这个问题。有人能帮我吗?
错误:

15/05/11 03:04:20 WARN mapred.LocalJobRunner: job_local934121164_0001
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/05/11 03:04:21 INFO mapreduce.Job: Job job_local934121164_0001 running in uber mode : false
15/05/11 03:04:21 INFO mapreduce.Job:  map 0% reduce 0%

这不应该发生在Map器中,也不应该发生在还原器中,我使用的键、值类型与您在下面看到的相同。我认为这里发生的唯一一件事是没有使用Map器类,而是使用默认的Map器类(它发出longwritable键)。我不知道我做错了什么。
searchmapper.java文件

import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
public class SearchMapper extends Mapper<Object, Text, Text, Text> {

    // Types of the input key, input value and the Context object through which 
    // the Mapper communicates with the Hadoop framework
    public void map(Object key, Text value, Context context, Node inNode)
            throws IOException, InterruptedException {

        // For each GRAY node, emit each of the adjacent nodes as a new node
        // (also GRAY) if the adjacent node is already processed
        // and colored BLACK, the reducer retains the color BLACK
        // Note that the mapper is not differentiating between BLACK GREY AND WHITE

        if (inNode.getColor() == Node.Color.GRAY) {
            for (String neighbor : inNode.getEdges()) { 
                Node adjacentNode = new Node();

                // Remember that the current node only has the value the id 
                // of its neighbour, and not the object itself. Therefore at 
                // this stage there is no way of knowing and assigning any of
                // its other properties. Also remember that the reducer is doing
                // the 'clean up' task and not the mapper.
                adjacentNode.setId(neighbor); 
                adjacentNode.setDistance(inNode.getDistance() + 1);
                adjacentNode.setColor(Node.Color.GRAY);
                adjacentNode.setParent(inNode.getId());
                context.write(new Text(adjacentNode.getId()), adjacentNode.getNodeInfo()); // Get nodeinfo returns a Text Object
            }
            inNode.setColor(Node.Color.BLACK);
        }
        // Emit the input node, other wise the BLACK color change(if it happens)
        // Wont be persistent 
        context.write(new Text(inNode.getId()), inNode.getNodeInfo());

    }
}

searchreducer.java文件

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class SearchReducer extends Reducer<Text, Text, Text, Text> {

    // Types of the input key, the values associated with the key, the Context object for Reducers communication
    // with the Hadoop framework and the node whose information has to be output
    // the return type is a Node
    public Node reduce(Text key, Iterable<Text> values, Context context, Node outNode)
            throws IOException, InterruptedException {

        // set the node id as the key
        outNode.setId(key.toString());

        // TODO : (huh?) Since the values are of the type Iterable, iterate through the values associated with the key
        // for all the values corresponding to a particular node id

        for (Text value : values) {

            Node inNode = new Node(key.toString() + "\t" + value.toString());

            // Emit one node after combining all the mapper outputs

            // Only one node(the original) will have a non-null adjascency list
            if (inNode.getEdges().size() > 0) {
                outNode.setEdges(inNode.getEdges());
            }

            // Save the minimum distance and parent
            if (inNode.getDistance() < outNode.getDistance()) {
                outNode.setDistance(inNode.getDistance());
                outNode.setParent(inNode.getParent());
            }

            // Save the darkest color
            if (inNode.getColor().ordinal() > outNode.getColor().ordinal()) {
                outNode.setColor(inNode.getColor());
            }        
        }
        context.write(key, new Text(outNode.getNodeInfo()));      
        return outNode;
    }
}

basejob.java(网站上提到的一个通用类,它基本上设置了作业)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public abstract class BaseJob extends Configured implements Tool {

        protected Job setupJob(String jobName,JobInfo jobInfo) throws Exception {

        Job job = new Job(new Configuration(), jobName);
        job.setJarByClass(jobInfo.getJarByClass());

        job.setMapperClass(jobInfo.getMapperClass());
        if (jobInfo.getCombinerClass() != null)
            job.setCombinerClass(jobInfo.getCombinerClass());
        job.setReducerClass(jobInfo.getReducerClass());

        // TODO : set number of reducers as required
        job.setNumReduceTasks(3);

        job.setOutputKeyClass(jobInfo.getOutputKeyClass());
        job.setOutputValueClass(jobInfo.getOutputValueClass());
       /*
        job.setJarByClass(SSSPJob.class);
        job.setMapperClass(SearchMapper.class);
        job.setReducerClass(SearchReducer.class);
        job.setNumReduceTasks(3);
        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);*/
        return job;
    }

   // Implement an abstract class for JobInfo object
    protected abstract class JobInfo {
        public abstract Class<?> getJarByClass();
        public abstract Class<? extends Mapper> getMapperClass();
        public abstract Class<? extends Reducer> getCombinerClass();
        public abstract Class<? extends Reducer> getReducerClass();
        public abstract Class<?> getOutputKeyClass();
        public abstract Class<?> getOutputValueClass();

    }
}

ssspjob.java(驱动程序)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

public class SSSPJob extends BaseJob {
    // method to set the configuration for the job and the mapper and the reducer classes
    private Job getJobConf(String[] args) 
        throws Exception {

    // Defining the abstract class objects
        JobInfo jobInfo = new JobInfo() {
            @Override
            public Class<? extends Reducer> getCombinerClass() {
                return null;
            }

            @Override
            public Class<?> getJarByClass() {
                return SSSPJob.class;
            }

            @Override
            public Class<? extends Mapper> getMapperClass() {
                return SearchMapper.class;
            }

            @Override
            public Class<?> getOutputKeyClass() {
                return Text.class;
            }

            @Override
            public Class<?> getOutputValueClass() {
                return Text.class;
            }

            @Override
            public Class<? extends Reducer> getReducerClass() {
                return SearchReducer.class;
            }
        };

        return setupJob("ssspjob", jobInfo);

    }

    // the driver to execute the job and invoke the map/reduce functions

    public int run(String[] args) throws Exception {
        int iterationCount = 0; 
        Job job;
        // No of grey nodes
        long terminationValue = 1;

        while( terminationValue >0){
            job = getJobConf(args); 
            String input, output;

            // Setting the input file and output file for each iteration
            // During the first time the user-specified file will be the
            // input whereas for the subsequent iterations
            // the output of the previous iteration will be the input
            // NOTE: Please be clear of how the input output files are set
            //       before proceding.

            // for the first iteration the input will be the first input argument
            if (iterationCount == 0) 
                input = args[0];
            else
                // for the remaining iterations, the input will be the output of the previous iteration
                input = args[1] + iterationCount;

            output = args[1] + (iterationCount + 1);

            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));

            job.waitForCompletion(true); 

            Counters jobCntrs = job.getCounters();
            terminationValue = jobCntrs.findCounter(MoreIterations.numberOfIterations).getValue();
            // if the counter's value is incremented in the reducer(s), then there are more
            // GRAY nodes to process implying that the iteration has to be continued.
            iterationCount++;
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new SSSPJob(), args);
        if(args.length != 2){
            System.err.println("Usage: <in> <output name> ");
            System.exit(1);
            System.out.println("Huh?");
        }
        System.exit(res);
    }

}

而且,我也不确定hadoop是如何调试的。我所有的debug print语句似乎都没有任何效果,我怀疑hadoop框架会将日志消息写入其他位置或文件。谢谢:)

pkwftd7m

pkwftd7m1#

mr作业中的key应该实现writeablecomparable,value应该实现writeable。我认为您的Map程序代码使用的是“object”类型的示例。只需在Map之前添加@override注解,并减少方法,以便它们显示错误。否则,您不会看到任何错误,但由于签名不匹配,因此会调用默认的identitymapper,从而导致错误。如果您正在处理一个文本文件,那么map方法的key应该是longwritable,如果您想使用一个自定义key,那么它应该实现writable。

相关问题