如何在hadoop的mapper和reducer中提供一个子类?

fivyi3re  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(225)

我有一个子(子)类,它是从父(父)类扩展而来的。我需要一种方法来为Map器的输入值提供一个常规类型,这样我就可以同时提供子级和父级作为有效值,如下所示:
公共静态类mymapper扩展了mapper<…,myparentclass,…>
我希望mychildclass(从myparentclass扩展而来)也有效。
但是,当我运行程序时,如果值是子类,我会得到一个异常:
Map值中的类型不匹配:应为myparentclass,收到mychildclass
如何使子类和父类都成为Map器的有效输入/输出值?
更新:

package hipi.examples.dumphib;

import hipi.image.FloatImage;
import hipi.image.ImageHeader;
import hipi.imagebundle.mapreduce.ImageBundleInputFormat;
import hipi.util.ByteUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class DumpHib extends Configured implements Tool {

  public static class DumpHibMapper extends Mapper<ImageHeader, FloatImage, IntWritable, Text> {

    @Override
    public void map(ImageHeader key, FloatImage value, Context context) throws IOException, InterruptedException  {

      int imageWidth = value.getWidth();
      int imageHeight = value.getHeight();

      String outputStr = null;

      if (key == null) {
    outputStr = "Failed to read image header.";
      } else if (value == null) {
    outputStr = "Failed to decode image data.";
      } else {
    String camera = key.getEXIFInformation("Model");
    String hexHash = ByteUtils.asHex(ByteUtils.FloatArraytoByteArray(value.getData()));
    outputStr = imageWidth + "x" + imageHeight + "\t(" + hexHash + ")\t  " + camera;
      }

      context.write(new IntWritable(1), new Text(outputStr));
    }

  }

  public static class DumpHibReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      for (Text value : values) {
    context.write(key, value);
      }
    }

  }

  public int run(String[] args) throws Exception {

    if (args.length < 2) {
      System.out.println("Usage: dumphib <input HIB> <output directory>");
      System.exit(0);
    }

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "dumphib");

    job.setJarByClass(DumpHib.class);
    job.setMapperClass(DumpHibMapper.class);
    job.setReducerClass(DumpHibReducer.class);

    job.setInputFormatClass(ImageBundleInputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    String inputPath = args[0];
    String outputPath = args[1];

    removeDir(outputPath, conf);

    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    job.setNumReduceTasks(1);

    return job.waitForCompletion(true) ? 0 : 1;

  }

  private static void removeDir(String path, Configuration conf) throws IOException {
    Path output_path = new Path(path);
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(output_path)) {
      fs.delete(output_path, true);
    }
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new DumpHib(), args);
    System.exit(res);
  }

}

floatimage是一个超类,我有一个扩展自它的childfloatimage类。当从recordreader返回childfloatimage时,它会引发上一个异常。

nkoocmlb

nkoocmlb1#

我遵循的解决方案是创建一个容器/ Package 器类,将所有必需的函数委托给origional对象,如下所示:

public class FloatImageContainer implements Writable, RawComparator<BinaryComparable> {

    private FloatImage floatImage;

    public FloatImage getFloatImage() {
        return floatImage;
    }

    public void setFloatImage(FloatImage floatImage) {
        this.floatImage = floatImage;
    }

    public FloatImageContainer() {
        this.floatImage = new FloatImage();
    }

    public FloatImageContainer(FloatImage floatImage) {
        this.floatImage = floatImage;
    }

    @Override
    public int compare(BinaryComparable o1, BinaryComparable o2) {
        // TODO Auto-generated method stub
        return floatImage.compare(o1, o2);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        // TODO Auto-generated method stub
        return floatImage.compare(b1, s1, l1, b2, s2, l2);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        floatImage.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        floatImage.readFields(in);
    }

}

在Map绘制程序中:

public static class MyMapper extends Mapper<..., FloatImageContainer, ..., ...> {

在这种情况下,floatimage和childfloatimage都可以封装在floatimagecontainer中,这样就消除了hadoop中的固有问题,因为只有一个类直接使用floatimagecontainer,它不是任何类的父/子类。

xam8gpfp

xam8gpfp2#

背景

原因是类型擦除使得java无法(在运行时)检查 MyMapper 实际上扩展了正确的类型(根据 Mapper ).
java基本上编译:

List<String> list = new ArrayList<String>();
list.add("Hi");
String x = list.get(0);

进入之内

List list = new ArrayList();
list.add("Hi");
String x = (String) list.get(0);

这个例子的学分在这里。
所以你在输入 MyMapper java想看的地方 Mapper<A, B, C, D> 特定的 A , B , C 以及 D -在运行时不可能。所以我们必须在编译时强制检查。

解决方案

可以对所有自定义子类执行以下操作:

job.setMapperClass(DumpHibMapper.class);

使用 java.lang.Class#asSubclass 而是这样做:

job.setMapperClass(DumpHibMapper.class.asSubclass(Mapper.class));

相关问题