Hadoop的Mapper和Reducer如何提供子类?

How to provide a subclass in Mapper and Reducer of Hadoop?

我有一个从超级 (parent) class 扩展而来的子 (child) class。我想要一种为映射器的输入值提供通用类型的方法,以便我可以提供 child 和 parent 作为有效值,如下所示:

public static class MyMapper extends Mapper<..., MyParentClass, ..., ...>

我希望从 MyParentClass 扩展而来的 MyChildClass 也有效。

但是当我 运行 程序如果值是 child class 我得到一个异常:

地图值类型不匹配:应为 MyParentClass,收到 MyChildClass

如何使 child 和 parent class 成为有效的 input/output 值 to/from 映射器?

更新:

package hipi.examples.dumphib;

import hipi.image.FloatImage;
import hipi.image.ImageHeader;
import hipi.imagebundle.mapreduce.ImageBundleInputFormat;
import hipi.util.ByteUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class DumpHib extends Configured implements Tool {

  public static class DumpHibMapper extends Mapper<ImageHeader, FloatImage, IntWritable, Text> {

    @Override
    public void map(ImageHeader key, FloatImage value, Context context) throws IOException, InterruptedException  {

      int imageWidth = value.getWidth();
      int imageHeight = value.getHeight();

      String outputStr = null;

      if (key == null) {
    outputStr = "Failed to read image header.";
      } else if (value == null) {
    outputStr = "Failed to decode image data.";
      } else {
    String camera = key.getEXIFInformation("Model");
    String hexHash = ByteUtils.asHex(ByteUtils.FloatArraytoByteArray(value.getData()));
    outputStr = imageWidth + "x" + imageHeight + "\t(" + hexHash + ")\t  " + camera;
      }

      context.write(new IntWritable(1), new Text(outputStr));
    }

  }

  public static class DumpHibReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      for (Text value : values) {
    context.write(key, value);
      }
    }

  }

  public int run(String[] args) throws Exception {

    if (args.length < 2) {
      System.out.println("Usage: dumphib <input HIB> <output directory>");
      System.exit(0);
    }

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "dumphib");

    job.setJarByClass(DumpHib.class);
    job.setMapperClass(DumpHibMapper.class);
    job.setReducerClass(DumpHibReducer.class);

    job.setInputFormatClass(ImageBundleInputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    String inputPath = args[0];
    String outputPath = args[1];

    removeDir(outputPath, conf);

    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    job.setNumReduceTasks(1);

    return job.waitForCompletion(true) ? 0 : 1;

  }

  private static void removeDir(String path, Configuration conf) throws IOException {
    Path output_path = new Path(path);
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(output_path)) {
      fs.delete(output_path, true);
    }
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new DumpHib(), args);
    System.exit(res);
  }

}

FloatImage 是一个超级 class 我有 ChildFloatImage class 从它延伸。当从 RecordReader 返回 ChildFloatImage 时,它​​抛出先前的异常。

背景

原因是类型擦除使得 Java 无法(在运行时)检查您的 MyMapper 实际上扩展了正确的类型(就 [ 上的泛型类型参数而言) =15=]).

Java基本编译:

List<String> list = new ArrayList<String>();
list.add("Hi");
String x = list.get(0);

进入

List list = new ArrayList();
list.add("Hi");
String x = (String) list.get(0);

此示例的学分为 here

所以你在 MyMapper 中输入 Java 想看到 Mapper<A, B, C, D> 特定的 ABCD - 在运行时不可能。所以我们必须在编译时强制执行该检查。

解决方案

您可以对所有自定义子类执行以下操作:

job.setMapperClass(DumpHibMapper.class);

使用java.lang.Class#asSubclass

改为这样做:

job.setMapperClass(DumpHibMapper.class.asSubclass(Mapper.class));

我遵循的解决方案是创建一个 container/wrapper class 将所有必需的函数委托给原始对象,如下所示:

public class FloatImageContainer implements Writable, RawComparator<BinaryComparable> {

    private FloatImage floatImage;

    public FloatImage getFloatImage() {
        return floatImage;
    }

    public void setFloatImage(FloatImage floatImage) {
        this.floatImage = floatImage;
    }

    public FloatImageContainer() {
        this.floatImage = new FloatImage();
    }

    public FloatImageContainer(FloatImage floatImage) {
        this.floatImage = floatImage;
    }

    @Override
    public int compare(BinaryComparable o1, BinaryComparable o2) {
        // TODO Auto-generated method stub
        return floatImage.compare(o1, o2);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        // TODO Auto-generated method stub
        return floatImage.compare(b1, s1, l1, b2, s2, l2);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        floatImage.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        floatImage.readFields(in);
    }

}

在映射器中:

public static class MyMapper extends Mapper<..., FloatImageContainer, ..., ...> {

在这种情况下,FloatImageChildFloatImage 都可以封装在 FloatImageContainer 中,而您摆脱 Hadoop 中的继承问题,因为只有一个 class 直接使用 FloatImageContainer 而不是 parent/child of any.