在 MapReduce 中使用 Custom Partitioner 时清空 reduce 输出
Empty reduce output while using Custom Partitioner in MapReduce
问题陈述:
输入
Monami 45000 A
Tarun 34000 B
Riju 25000 C
Rita 42000 A
Mithun 40000 A
Archana 21000 C
Shovik 32000 B
我想在 Mapreduce 中使用 Custom Partitioner 将 A、B 和 C 级员工记录分隔在三个不同的输出文件中。
输出 1
Monami 45000 A
Rita 42000 A
Mithun 40000 A
输出 2
Tarun 34000 B
Shovik 32000 B
输出 3
Riju 25000 C
Archana 21000 C
地图代码:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class Map
extends Mapper<LongWritable,Text,Text,Text>
{
//private Text key1 = new Text();
//private Text value1 = new Text();
@Override
protected void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line = value.toString();
String[] part = line.split("\t");
int len = part.length;
//System.out.println(len);
if (len == 3)
{
context.write(new Text(part[2]), new Text(part[0]+"\t"+part[1]));
//System.out.println(part[0]+part[1]+part[2]);
}
}
分区程序代码
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomPartitioner
extends Partitioner<Text,Text>
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
if(numReduceTasks==0)
return 0;
if(key.equals(new Text("A")))
return 0;
if(key.equals(new Text("B")))
return 1;
else
return 2;
}
}
减少代码
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce
extends Reducer<Text,Text,Text,Text>
{
@Override
protected void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
Iterator<Text> itr = values.iterator();
while(itr.hasNext())
{
context.write(new Text(itr.next().getBytes()),new Text(key));
}
}
}
Driver Class
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapReduceDriver
{
public static void main(String[] args) throws Exception
{
Job job = new Job();
job.setJarByClass(MapReduceDriver.class);
job.setJobName("Custom Partitioner");
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(Map.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(3);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
代码运行没有任何错误,但三个 reduce 输出文件为空。此外,当作业运行时,它会将映射输出字节显示为零。因此我相信地图不会生成任何 key-value 对。但我找不到原因。你能帮我找出错误吗?
我还有一个困惑:在 Map class 中,当检查变量 len
是否 > 0 时,我得到 ArrayIndexOutOfBoundsException 但如果用 == 检查它运行正常,没有任何异常3. 为什么会抛出 > 0 的异常?
问题是您的输入数据(粘贴在此处)不是制表符分隔的,而是逗号分隔的。如果您替换此行,它应该可以正常工作:
String[] part = line.split("\t");
这一行:
String[] part = line.split(" ");
检查 len > 0
时出现异常的原因是您的字符串未拆分为任何子部分,因此 len
为 1。然后它满足 if 条件并尝试为零件的位置 2 执行一些不存在的操作。
在现有代码中,len
不是3,所以代码不会进入if块,因此不会抛出异常。
问题陈述:
输入
Monami 45000 A
Tarun 34000 B
Riju 25000 C
Rita 42000 A
Mithun 40000 A
Archana 21000 C
Shovik 32000 B
我想在 Mapreduce 中使用 Custom Partitioner 将 A、B 和 C 级员工记录分隔在三个不同的输出文件中。
输出 1
Monami 45000 A
Rita 42000 A
Mithun 40000 A
输出 2
Tarun 34000 B
Shovik 32000 B
输出 3
Riju 25000 C
Archana 21000 C
地图代码:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class Map
extends Mapper<LongWritable,Text,Text,Text>
{
//private Text key1 = new Text();
//private Text value1 = new Text();
@Override
protected void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException
{
String line = value.toString();
String[] part = line.split("\t");
int len = part.length;
//System.out.println(len);
if (len == 3)
{
context.write(new Text(part[2]), new Text(part[0]+"\t"+part[1]));
//System.out.println(part[0]+part[1]+part[2]);
}
}
分区程序代码
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomPartitioner
extends Partitioner<Text,Text>
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
if(numReduceTasks==0)
return 0;
if(key.equals(new Text("A")))
return 0;
if(key.equals(new Text("B")))
return 1;
else
return 2;
}
}
减少代码
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce
extends Reducer<Text,Text,Text,Text>
{
@Override
protected void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException
{
Iterator<Text> itr = values.iterator();
while(itr.hasNext())
{
context.write(new Text(itr.next().getBytes()),new Text(key));
}
}
}
Driver Class
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapReduceDriver
{
public static void main(String[] args) throws Exception
{
Job job = new Job();
job.setJarByClass(MapReduceDriver.class);
job.setJobName("Custom Partitioner");
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(Map.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(3);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
代码运行没有任何错误,但三个 reduce 输出文件为空。此外,当作业运行时,它会将映射输出字节显示为零。因此我相信地图不会生成任何 key-value 对。但我找不到原因。你能帮我找出错误吗?
我还有一个困惑:在 Map class 中,当检查变量 len
是否 > 0 时,我得到 ArrayIndexOutOfBoundsException 但如果用 == 检查它运行正常,没有任何异常3. 为什么会抛出 > 0 的异常?
问题是您的输入数据(粘贴在此处)不是制表符分隔的,而是逗号分隔的。如果您替换此行,它应该可以正常工作:
String[] part = line.split("\t");
这一行:
String[] part = line.split(" ");
检查 len > 0
时出现异常的原因是您的字符串未拆分为任何子部分,因此 len
为 1。然后它满足 if 条件并尝试为零件的位置 2 执行一些不存在的操作。
在现有代码中,len
不是3,所以代码不会进入if块,因此不会抛出异常。