MapReduce - 来自输入和数组的多个输出
MapReduce - Multiple outputs from an input and an array
目标
我想从一个输入文件和一个数组中得到多个输出文件,如 picture 所述。
我的想法
我考虑过为父 class Program
设置一个名为“check
”的静态属性。
public class Program
{
//Attribute check
private static String check = null;
public static class ProgramMapper extends Reducer<Object, Text, Text, Text>{
// mapping
}
public static class ProgramReducer extends Reducer<Object, Text, Text, TextArray>{
// reducing
}
public static void main(String[] args){
// main program
}
// ...
在main
方法中,我会在循环中将check
分配给"a"、"b"、"c":
public static void main(String[] args) throws Exception{
// Array of checkpoints
String[] arr = {"a", "c", "f"};
// Loop for assigning check
for(int j = 0; j<arr.length ; j++){
check = arr[j];
// job configuration
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Program");
//...
for (int i = 0; i < otherArgs.length - 1; ++i){
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
/* here I define multiple outputs */
FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]+j));
job.waitForCompletion(true);
if (j == arr.length -1){
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
reducer
会检查密钥是否等于检查
public static class ProgramReducer extends Reducer<Object, Text, Text, TextArray>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
ArrayList<Text> result = new ArrayList<Text>();
String key1 = key.toString();
// check if the key is equal to check
if ( key1.equals(check) ){
result.add(new Text("o"));
}else{
result.add(new Text("x"));
}
// other reducing code
}
}
问题
check
从未分配给 "a"、"b"、"c",所以我有 3 个输出文件都未选中。
请问我该如何解决这个问题?
您的 main
方法在客户端上是 运行,但映射器和缩减器在 Hadoop 节点上是 运行。要将参数传递给您的 mapreduce 作业,您可以使用 Configuration
对象。
在你的main
方法中设置值:
conf.set("check", check);
并在 reduce 中得到它:
check = context.getConfiguration().get("check");
您可以使用方法 Reducer.setup
在处理数据之前仅设置一次此值。
目标
我想从一个输入文件和一个数组中得到多个输出文件,如 picture 所述。
我的想法
我考虑过为父 class Program
设置一个名为“check
”的静态属性。
public class Program
{
//Attribute check
private static String check = null;
public static class ProgramMapper extends Reducer<Object, Text, Text, Text>{
// mapping
}
public static class ProgramReducer extends Reducer<Object, Text, Text, TextArray>{
// reducing
}
public static void main(String[] args){
// main program
}
// ...
在main
方法中,我会在循环中将check
分配给"a"、"b"、"c":
public static void main(String[] args) throws Exception{
// Array of checkpoints
String[] arr = {"a", "c", "f"};
// Loop for assigning check
for(int j = 0; j<arr.length ; j++){
check = arr[j];
// job configuration
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Program");
//...
for (int i = 0; i < otherArgs.length - 1; ++i){
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
/* here I define multiple outputs */
FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]+j));
job.waitForCompletion(true);
if (j == arr.length -1){
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
reducer
会检查密钥是否等于检查
public static class ProgramReducer extends Reducer<Object, Text, Text, TextArray>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
ArrayList<Text> result = new ArrayList<Text>();
String key1 = key.toString();
// check if the key is equal to check
if ( key1.equals(check) ){
result.add(new Text("o"));
}else{
result.add(new Text("x"));
}
// other reducing code
}
}
问题
check
从未分配给 "a"、"b"、"c",所以我有 3 个输出文件都未选中。
请问我该如何解决这个问题?
您的 main
方法在客户端上是 运行,但映射器和缩减器在 Hadoop 节点上是 运行。要将参数传递给您的 mapreduce 作业,您可以使用 Configuration
对象。
在你的main
方法中设置值:
conf.set("check", check);
并在 reduce 中得到它:
check = context.getConfiguration().get("check");
您可以使用方法 Reducer.setup
在处理数据之前仅设置一次此值。