如何在没有键、值对的情况下保存mapreduce的reducer输出?

g6baxovj  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(500)

我正在写一个mapreduce程序来处理dicom图像。这个mapreduce程序的目的是处理dicom图像,从中提取元数据,索引到solr,最后在reducer阶段将原始图像保存在hdfs中。我想在hdfs中保存相同的文件作为一个reducer输出
因此,我已经实现了大部分功能,但在reducer阶段,在hdfs中存储同一个文件时,它不起作用。
我已经用dicom图像浏览器测试了处理过的dicom文件,它说这个文件被选中了,而且处理过的dicom文件的大小略有增加。例如,原来的dicom大小是628kb,当reducer将此文件保存在hdfs中时,它的大小将更改为630kb。
我尝试了从这些链接解决方案,但没有一个给出预期的结果。
hadoopmapreduce如何在hdfs中只存储值
hadoop-如何收集没有值的文本输出
以下是将dicom文件作为单个文件读取(不拆分)的代码。

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }       
}

自定义recordreader

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {     
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();     
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            System.out.println("Inside nextKeyvalue");
            System.out.println(fileSplit.getLength());
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
                processed = true;
                return true;
            }
            return false;
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException 
    {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
    }

}

mapper类mapper类根据我们的需要完美地工作。

public class MapClass{

    public static class Map extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{   

        @Override
        protected void map(NullWritable key, BytesWritable value,
                Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            value.setCapacity(value.getLength());
            InputStream in = new ByteArrayInputStream(value.getBytes());            
            ProcessDicom.metadata(in); // Process dicom image and extract metadata from it
            Text keyOut = getFileName(context);
            context.write(keyOut, value);

        }

        private Text getFileName(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
        {
            InputSplit spl = context.getInputSplit();
            Path filePath = ((FileSplit)spl).getPath();
            String fileName = filePath.getName();
            Text text = new Text(fileName);
            return text;
        }

        @Override
        protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            super.setup(context);
        }

    }

减速机类这是减速机类。公共类reduceclass{

public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, BytesWritable>{

        @Override
            protected void reduce(Text key, Iterable<BytesWritable> value,
                    Reducer<Text, BytesWritable, BytesWritable, BytesWritable>.Context context)
                    throws IOException, InterruptedException {

            Iterator<BytesWritable> itr = value.iterator();
            while(itr.hasNext())
            {
                BytesWritable wr = itr.next();
                wr.setCapacity(wr.getLength());
                context.write(new BytesWritable(key.copyBytes()), itr.next());
            }
        }
}

主要类别

public class DicomIndexer{

    public static void main(String[] argss) throws Exception{
        String args[] = {"file:///home/b3ds/storage/dd","hdfs://192.168.38.68:8020/output"};
        run(args);
    }

    public static void run(String[] args) throws Exception {

        //Initialize the Hadoop job and set the jar as well as the name of the Job
        Configuration conf = new Configuration();
        Job job = new Job(conf, "WordCount");
        job.setJarByClass(WordCount.class);
//      job.getConfiguration().set("mapreduce.output.basename", "hi");
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(BytesWritable.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

所以我完全不知道该怎么办。有些链接说这是不可能的,因为mapreduce在pair上工作,有些链接说使用nullwritable。到目前为止,我已经尝试过nullwritable,sequencefileoutputformat,但是没有一个有效。

jtoj6r0c

jtoj6r0c1#

两件事:
通过调用 itr.next() 两次,没办法。
正如您所确定的,您正在编写一个键和一个值,而您只想编写一个。而是使用 NullWritable 为了价值。你的减速机看起来像:

public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, NullWritable>{
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> value,
                          Reducer<Text, BytesWritable, BytesWritable, NullWritable>.Context context)
            throws IOException, InterruptedException {
        NullWritable nullWritable = NullWritable.get();
        Iterator<BytesWritable> itr = value.iterator();
        while(itr.hasNext())
        {
            BytesWritable wr = itr.next();
            wr.setCapacity(wr.getLength());
            context.write(wr, nullWritable);
        }
    }
}

相关问题