Hadoop2(Yarn)获取java.io.ioexception:错误的键类例外

fhg3lkii  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(315)

我正在试着运行hadoop2 MapReduce 处理输出\格式\类是 SequenceFileOutputFormat 以及 input_format_classSequenceFileInputFormat .
我选择了 Mapper 将键和值作为 BytesWritable . 对于 Reducer 它发出钥匙声 IntWritable 和价值 BytesWritable .
每次出现以下错误时:

Error: java.io.IOException: wrong key class: org.apache.hadoop.io.BytesWritable is not class org.apache.hadoop.io.IntWritable
        at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1306)
        at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:83)
        at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
        at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

我发现当我定义 OutputFormat 不象 SequenceFileOutputFormat 问题解决了,但我需要它作为一个整体 SequenceFileOutputFormat .
主要内容如下:

Configuration conf = new Configuration(true);
    conf.set("refpath", "/out/Sample1/Local/EU/CloudBurst/BinaryFiles/ref.br");
    conf.set("qrypath",   "/out/Sample1/Local/EU/CloudBurst/BinaryFiles/qry.br");
    conf.set("MIN_READ_LEN",      Integer.toString(MIN_READ_LEN));
    conf.set("MAX_READ_LEN",      Integer.toString(MAX_READ_LEN));
    conf.set("K",                 Integer.toString(K));
    conf.set("SEED_LEN",          Integer.toString(SEED_LEN));
    conf.set("FLANK_LEN",         Integer.toString(FLANK_LEN));
    conf.set("ALLOW_DIFFERENCES", Integer.toString(ALLOW_DIFFERENCES));
    conf.set("BLOCK_SIZE",        Integer.toString(BLOCK_SIZE));
    conf.set("REDUNDANCY",        Integer.toString(REDUNDANCY));
    conf.set("FILTER_ALIGNMENTS", (FILTER_ALIGNMENTS ? "1" : "0"));

    Job job = new Job(conf,"CloudBurst");
    job.setNumReduceTasks(NUM_REDUCE_TASKS); // MV2

    //conf.setNumMapTasks(NUM_MAP_TASKS); TODO find solution for mv2

    FileInputFormat.addInputPath(job, new Path("/out/Sample1/Local/EU/CloudBurst/BinaryFiles/ref.br"));//TODO change it fit to the params
    FileInputFormat.addInputPath(job, new Path("/out/Sample1/Local/EU/CloudBurst/BinaryFiles/qry.br"));//TODO change it fit to the params

    job.setJarByClass(MerReduce.class);//mv2

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    // The order of seeds is not important, but make sure the reference seeds are seen before the qry seeds
    job.setPartitionerClass(MerReduce.PartitionMers.class); // mv2
    job.setGroupingComparatorClass(MerReduce.GroupMersWC.class); //mv2 TODO

    job.setMapperClass(MerReduce.MapClass.class);
    job.setReducerClass(MerReduce.ReduceClass.class);

    job.setMapOutputKeyClass(BytesWritable.class);//mv2
    job.setMapOutputValueClass(BytesWritable.class);//mv2
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(BytesWritable.class);

    Path oPath = new Path("/out/Sample1/Local/EU/Vectors");//TODO change it fit to the params
    //conf.setOutputPath(oPath);
    FileOutputFormat.setOutputPath(job, oPath);
    System.err.println("  Removing old results");
    FileSystem.get(conf).delete(oPath);

    int code = job.waitForCompletion(true) ? 0 : 1;

    System.err.println("Finished");
}

mapper类标题:

public static class MapClass extends Mapper<IntWritable, BytesWritable, BytesWritable, BytesWritable>
public void map(IntWritable id, BytesWritable rawRecord,Context context) throws IOException, InterruptedException

课程标题:

public static class ReduceClass extends Reducer (BytesWritable,  BytesWritable, IntWritable, BytesWritable)

public synchronized void reduce(BytesWritable mer, Iterator<BytesWritable> values,Context context)
            throws IOException, InterruptedException {

有人有主意吗?

e7arh2l6

e7arh2l61#

job.setInputFormatClass(SequenceFileInputFormat.class);

应该是
job.setinputformatclass(intwritable.class);
Map器的输入是int和bytes,但在job中两个输入都是按顺序提供的

相关问题