Hadoop的Mapper和Reducer如何提供子类?
How to provide a subclass in Mapper and Reducer of Hadoop?
我有一个从超级 (parent) class 扩展而来的子 (child) class。我想要一种为映射器的输入值提供通用类型的方法,以便我可以提供 child 和 parent 作为有效值,如下所示:
public static class MyMapper extends Mapper<..., MyParentClass, ..., ...>
我希望从 MyParentClass 扩展而来的 MyChildClass 也有效。
但是当我 运行 程序如果值是 child class 我得到一个异常:
地图值类型不匹配:应为 MyParentClass,收到 MyChildClass
如何使 child 和 parent class 成为有效的 input/output 值 to/from 映射器?
更新:
package hipi.examples.dumphib;
import hipi.image.FloatImage;
import hipi.image.ImageHeader;
import hipi.imagebundle.mapreduce.ImageBundleInputFormat;
import hipi.util.ByteUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
public class DumpHib extends Configured implements Tool {
public static class DumpHibMapper extends Mapper<ImageHeader, FloatImage, IntWritable, Text> {
@Override
public void map(ImageHeader key, FloatImage value, Context context) throws IOException, InterruptedException {
int imageWidth = value.getWidth();
int imageHeight = value.getHeight();
String outputStr = null;
if (key == null) {
outputStr = "Failed to read image header.";
} else if (value == null) {
outputStr = "Failed to decode image data.";
} else {
String camera = key.getEXIFInformation("Model");
String hexHash = ByteUtils.asHex(ByteUtils.FloatArraytoByteArray(value.getData()));
outputStr = imageWidth + "x" + imageHeight + "\t(" + hexHash + ")\t " + camera;
}
context.write(new IntWritable(1), new Text(outputStr));
}
}
public static class DumpHibReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("Usage: dumphib <input HIB> <output directory>");
System.exit(0);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "dumphib");
job.setJarByClass(DumpHib.class);
job.setMapperClass(DumpHibMapper.class);
job.setReducerClass(DumpHibReducer.class);
job.setInputFormatClass(ImageBundleInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
String inputPath = args[0];
String outputPath = args[1];
removeDir(outputPath, conf);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setNumReduceTasks(1);
return job.waitForCompletion(true) ? 0 : 1;
}
private static void removeDir(String path, Configuration conf) throws IOException {
Path output_path = new Path(path);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output_path)) {
fs.delete(output_path, true);
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new DumpHib(), args);
System.exit(res);
}
}
FloatImage 是一个超级 class 我有 ChildFloatImage class 从它延伸。当从 RecordReader 返回 ChildFloatImage 时,它抛出先前的异常。
背景
原因是类型擦除使得 Java 无法(在运行时)检查您的 MyMapper
实际上扩展了正确的类型(就 [ 上的泛型类型参数而言) =15=]).
Java基本编译:
List<String> list = new ArrayList<String>();
list.add("Hi");
String x = list.get(0);
进入
List list = new ArrayList();
list.add("Hi");
String x = (String) list.get(0);
此示例的学分为 here。
所以你在 MyMapper
中输入 Java 想看到 Mapper<A, B, C, D>
特定的 A
、B
、C
和 D
- 在运行时不可能。所以我们必须在编译时强制执行该检查。
解决方案
您可以对所有自定义子类执行以下操作:
job.setMapperClass(DumpHibMapper.class);
使用java.lang.Class#asSubclass
改为这样做:
job.setMapperClass(DumpHibMapper.class.asSubclass(Mapper.class));
我遵循的解决方案是创建一个 container/wrapper class 将所有必需的函数委托给原始对象,如下所示:
public class FloatImageContainer implements Writable, RawComparator<BinaryComparable> {
private FloatImage floatImage;
public FloatImage getFloatImage() {
return floatImage;
}
public void setFloatImage(FloatImage floatImage) {
this.floatImage = floatImage;
}
public FloatImageContainer() {
this.floatImage = new FloatImage();
}
public FloatImageContainer(FloatImage floatImage) {
this.floatImage = floatImage;
}
@Override
public int compare(BinaryComparable o1, BinaryComparable o2) {
// TODO Auto-generated method stub
return floatImage.compare(o1, o2);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return floatImage.compare(b1, s1, l1, b2, s2, l2);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
floatImage.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
floatImage.readFields(in);
}
}
在映射器中:
public static class MyMapper extends Mapper<..., FloatImageContainer, ..., ...> {
在这种情况下,FloatImage 和 ChildFloatImage 都可以封装在 FloatImageContainer 中,而您摆脱 Hadoop 中的继承问题,因为只有一个 class 直接使用 FloatImageContainer 而不是 parent/child of any.
我有一个从超级 (parent) class 扩展而来的子 (child) class。我想要一种为映射器的输入值提供通用类型的方法,以便我可以提供 child 和 parent 作为有效值,如下所示:
public static class MyMapper extends Mapper<..., MyParentClass, ..., ...>
我希望从 MyParentClass 扩展而来的 MyChildClass 也有效。
但是当我 运行 程序如果值是 child class 我得到一个异常:
地图值类型不匹配:应为 MyParentClass,收到 MyChildClass
如何使 child 和 parent class 成为有效的 input/output 值 to/from 映射器?
更新:
package hipi.examples.dumphib;
import hipi.image.FloatImage;
import hipi.image.ImageHeader;
import hipi.imagebundle.mapreduce.ImageBundleInputFormat;
import hipi.util.ByteUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
public class DumpHib extends Configured implements Tool {
public static class DumpHibMapper extends Mapper<ImageHeader, FloatImage, IntWritable, Text> {
@Override
public void map(ImageHeader key, FloatImage value, Context context) throws IOException, InterruptedException {
int imageWidth = value.getWidth();
int imageHeight = value.getHeight();
String outputStr = null;
if (key == null) {
outputStr = "Failed to read image header.";
} else if (value == null) {
outputStr = "Failed to decode image data.";
} else {
String camera = key.getEXIFInformation("Model");
String hexHash = ByteUtils.asHex(ByteUtils.FloatArraytoByteArray(value.getData()));
outputStr = imageWidth + "x" + imageHeight + "\t(" + hexHash + ")\t " + camera;
}
context.write(new IntWritable(1), new Text(outputStr));
}
}
public static class DumpHibReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("Usage: dumphib <input HIB> <output directory>");
System.exit(0);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "dumphib");
job.setJarByClass(DumpHib.class);
job.setMapperClass(DumpHibMapper.class);
job.setReducerClass(DumpHibReducer.class);
job.setInputFormatClass(ImageBundleInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
String inputPath = args[0];
String outputPath = args[1];
removeDir(outputPath, conf);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setNumReduceTasks(1);
return job.waitForCompletion(true) ? 0 : 1;
}
private static void removeDir(String path, Configuration conf) throws IOException {
Path output_path = new Path(path);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output_path)) {
fs.delete(output_path, true);
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new DumpHib(), args);
System.exit(res);
}
}
FloatImage 是一个超级 class 我有 ChildFloatImage class 从它延伸。当从 RecordReader 返回 ChildFloatImage 时,它抛出先前的异常。
背景
原因是类型擦除使得 Java 无法(在运行时)检查您的 MyMapper
实际上扩展了正确的类型(就 [ 上的泛型类型参数而言) =15=]).
Java基本编译:
List<String> list = new ArrayList<String>();
list.add("Hi");
String x = list.get(0);
进入
List list = new ArrayList();
list.add("Hi");
String x = (String) list.get(0);
此示例的学分为 here。
所以你在 MyMapper
中输入 Java 想看到 Mapper<A, B, C, D>
特定的 A
、B
、C
和 D
- 在运行时不可能。所以我们必须在编译时强制执行该检查。
解决方案
您可以对所有自定义子类执行以下操作:
job.setMapperClass(DumpHibMapper.class);
使用java.lang.Class#asSubclass
改为这样做:
job.setMapperClass(DumpHibMapper.class.asSubclass(Mapper.class));
我遵循的解决方案是创建一个 container/wrapper class 将所有必需的函数委托给原始对象,如下所示:
public class FloatImageContainer implements Writable, RawComparator<BinaryComparable> {
private FloatImage floatImage;
public FloatImage getFloatImage() {
return floatImage;
}
public void setFloatImage(FloatImage floatImage) {
this.floatImage = floatImage;
}
public FloatImageContainer() {
this.floatImage = new FloatImage();
}
public FloatImageContainer(FloatImage floatImage) {
this.floatImage = floatImage;
}
@Override
public int compare(BinaryComparable o1, BinaryComparable o2) {
// TODO Auto-generated method stub
return floatImage.compare(o1, o2);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return floatImage.compare(b1, s1, l1, b2, s2, l2);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
floatImage.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
floatImage.readFields(in);
}
}
在映射器中:
public static class MyMapper extends Mapper<..., FloatImageContainer, ..., ...> {
在这种情况下,FloatImage 和 ChildFloatImage 都可以封装在 FloatImageContainer 中,而您摆脱 Hadoop 中的继承问题,因为只有一个 class 直接使用 FloatImageContainer 而不是 parent/child of any.