Mapredus 中的 Haspartition
HashPartition in MapReduce
Objective :
实施 HashPartition 并检查自动创建的 reducer 的数量。
为此目的,我们始终感谢您提供任何帮助和任何示例代码。
我做了什么:
我 运行 一个 map reduce 程序,在一个 250MB 的 csv 文件上实现了哈希分区。
但我仍然看到 hdfs 仅使用 1 个 reducer 进行聚合。如果我没理解错的话,hdfs 应该会自动创建分区并均匀分布数据。然后 n 个减速器将在创建的 n 个分区上工作。但我没有看到这种情况发生。任何人都可以帮助我通过哈希分区实现这一目标。我不想定义分区数。
映射程序代码:
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
String airlineid = line[7];
//int tailno = Integer.parseInt(line[10].replace("\"", ""));
String tailno = line[9].replace("\"", "");
if (tailno.length() != 0 ){
//System.out.println(airlineid + " " + tailno + " " + tailno.length());
context.write(new Text(airlineid), new Text(tailno));
}
}
}
减速器代码:
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int count=0;
for (Text value : values) {
count ++;
}
//context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(count));
}
分区程序代码:
public class FlightPartition extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
Driver :
public class Flight
{
public static void main (String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Flight");
job.setJarByClass(Flight.class);
job.setMapperClass(FlightMapper.class);
job.setReducerClass(FlightReducer.class);
job.setPartitionerClass(FlightPartition.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
日志:
15/11/09 06:14:14 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=7008211
FILE: Number of bytes written=14438683
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=211682444
HDFS: Number of bytes written=178
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=2
Launched map tasks=5
Launched reduce tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=2235296
Total time spent by all reduces in occupied slots (ms)=606517
Total time spent by all map tasks (ms)=2235296
Total time spent by all reduce tasks (ms)=606517
Total vcore-seconds taken by all map tasks=2235296
Total vcore-seconds taken by all reduce tasks=606517
Total megabyte-seconds taken by all map tasks=2288943104
Total megabyte-seconds taken by all reduce tasks=621073408
Map-Reduce Framework
Map input records=470068
Map output records=467281
Map output bytes=6073643
Map output materialized bytes=7008223
Input split bytes=411
Combine input records=0
Combine output records=0
Reduce input groups=15
Reduce shuffle bytes=7008223
Reduce input records=467281
Reduce output records=15
Spilled Records=934562
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=3701
CPU time spent (ms)=277080
Physical memory (bytes) snapshot=590581760
Virtual memory (bytes) snapshot=3196801024
Total committed heap usage (bytes)=441397248
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=211682033
File Output Format Counters
Bytes Written=178
检查您的 mapred-default.xml
文件并寻找
mapreduce.job.reduces
属性。将值更改为 > 1 以获得集群中的更多减速器。如果当 mapreduce.jobtracker.address 是“local
”时,这个 属性 将被忽略。
您可以使用
覆盖 java 中的默认值 属性
job.setNumReduceTasks(3)
查看此 article 以获取来自 Apache 的 mapred-default.xml 的完整列表。
减少了多少?(来自 Apache)
The right number of reduces seems to be 0.95 or 1.75 multiplied by ( * ).
With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.
有多少张地图?
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
Objective :
实施 HashPartition 并检查自动创建的 reducer 的数量。
为此目的,我们始终感谢您提供任何帮助和任何示例代码。
我做了什么:
我 运行 一个 map reduce 程序,在一个 250MB 的 csv 文件上实现了哈希分区。 但我仍然看到 hdfs 仅使用 1 个 reducer 进行聚合。如果我没理解错的话,hdfs 应该会自动创建分区并均匀分布数据。然后 n 个减速器将在创建的 n 个分区上工作。但我没有看到这种情况发生。任何人都可以帮助我通过哈希分区实现这一目标。我不想定义分区数。
映射程序代码:
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
String airlineid = line[7];
//int tailno = Integer.parseInt(line[10].replace("\"", ""));
String tailno = line[9].replace("\"", "");
if (tailno.length() != 0 ){
//System.out.println(airlineid + " " + tailno + " " + tailno.length());
context.write(new Text(airlineid), new Text(tailno));
}
}
}
减速器代码:
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int count=0;
for (Text value : values) {
count ++;
}
//context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(count));
}
分区程序代码:
public class FlightPartition extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
Driver :
public class Flight
{
public static void main (String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Flight");
job.setJarByClass(Flight.class);
job.setMapperClass(FlightMapper.class);
job.setReducerClass(FlightReducer.class);
job.setPartitionerClass(FlightPartition.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
日志:
15/11/09 06:14:14 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=7008211
FILE: Number of bytes written=14438683
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=211682444
HDFS: Number of bytes written=178
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=2
Launched map tasks=5
Launched reduce tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=2235296
Total time spent by all reduces in occupied slots (ms)=606517
Total time spent by all map tasks (ms)=2235296
Total time spent by all reduce tasks (ms)=606517
Total vcore-seconds taken by all map tasks=2235296
Total vcore-seconds taken by all reduce tasks=606517
Total megabyte-seconds taken by all map tasks=2288943104
Total megabyte-seconds taken by all reduce tasks=621073408
Map-Reduce Framework
Map input records=470068
Map output records=467281
Map output bytes=6073643
Map output materialized bytes=7008223
Input split bytes=411
Combine input records=0
Combine output records=0
Reduce input groups=15
Reduce shuffle bytes=7008223
Reduce input records=467281
Reduce output records=15
Spilled Records=934562
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=3701
CPU time spent (ms)=277080
Physical memory (bytes) snapshot=590581760
Virtual memory (bytes) snapshot=3196801024
Total committed heap usage (bytes)=441397248
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=211682033
File Output Format Counters
Bytes Written=178
检查您的 mapred-default.xml
文件并寻找
mapreduce.job.reduces
属性。将值更改为 > 1 以获得集群中的更多减速器。如果当 mapreduce.jobtracker.address 是“local
”时,这个 属性 将被忽略。
您可以使用
覆盖 java 中的默认值 属性job.setNumReduceTasks(3)
查看此 article 以获取来自 Apache 的 mapred-default.xml 的完整列表。
减少了多少?(来自 Apache)
The right number of reduces seems to be 0.95 or 1.75 multiplied by ( * ).
With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.
有多少张地图?
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.