Hadoop:在运行时更改减速器的数量
Hadoop: Change number of reducers at runtime
假设以下场景:一组依赖作业,它们被发送到 hadoop。 Hadoop 执行第一个,然后执行依赖于第一个的第二个,依此类推。作业使用 JobControl
一次性提交(参见下面的代码)。
使用 Hadoop 2.x(在 Java 中),是否可以在运行时更改作业的 reducer 数量?更具体地说,如何在执行作业 1 后更改作业 2 中的减速器数量?
还有,有没有办法让hadoop通过估计map输出自动推断出reducer的数量?它总是需要 1,而且我找不到更改默认设置的方法(除非我自己明确设置数字)。
// 1. create JobControl
JobControl jc = new JobControl(name);
// 2. add all the controlled jobs to the job control
// note that this is done in one go by using a collection
jc.addJobCollection(jobs);
// 3. execute the jobcontrol in a Thread
Thread workflowThread = new Thread(jc, "Thread_" + name);
workflowThread.setDaemon(true); // will not avoid JVM to shutdown
// 4. we wait for it to complete
LOG.info("Waiting for thread to complete: " + workflowThread.getName());
while (!jc.allFinished()) {
Thread.sleep(REFRESH_WAIT);
}
你的第一个问题。是的,您可以在驱动程序中执行作业 1 后设置作业 2 的减速器数量:
Job job1 = new Job(conf, "job 1");
//your job setup here
//...
job1.submit();
job1.waitForCompletion(true);
int job2Reducers = ... //compute based on job1 results here
Job job2 = new Job(conf, "job 2");
job2.setNumReduceTasks(job2Reducers);
//your job2 setup here
//...
job2.submit();
job2.waitForCompletion(true);
第二个问题,据我所知,不,你不能让 Hadoop 根据你的映射器负载自动选择 reducer 的数量。
地图的数量通常由输入文件中的 DFS 块数量决定。虽然这会导致人们调整他们的 DFS 块大小来调整地图的数量。
所以我们可以使用与 map 相同的逻辑来设置 reducer 任务的数量。
为了使 reducer 动态化,我编写了逻辑来动态设置 reducer 任务的数量,以在运行时根据 map 任务的数量进行调整。
在Java代码中:
long defaultBlockSize = 0;
int NumOfReduce = 10; // default value you can give any number.
long inputFileLength = 0;
try {
FileSystem fileSystem = FileSystem.get(this.getConf()); // hdfs file system
inputFileLength = fileSystem.getContentSummary(
new Path(PROP_HDFS_INPUT_LOCATION)).getLength();// input files stored in hdfs location
defaultBlockSize = fileSystem.getDefaultBlockSize(new Path(
hdfsFilePath.concat("PROP_HDFS_INPUT_LOCATION")));// getting default block size
if (inputFileLength > 0 && defaultBlockSize > 0) {
NumOfReduce = (int) (((inputFileLength / defaultBlockSize) + 1) * 2);// calculating number of tasks
}
System.out.println("NumOfReduce : " + NumOfReduce);
} catch (Exception e) {
LOGGER.error(" Exception{} ", e);
}
job.setNumReduceTasks(NumOfReduce);
假设以下场景:一组依赖作业,它们被发送到 hadoop。 Hadoop 执行第一个,然后执行依赖于第一个的第二个,依此类推。作业使用 JobControl
一次性提交(参见下面的代码)。
使用 Hadoop 2.x(在 Java 中),是否可以在运行时更改作业的 reducer 数量?更具体地说,如何在执行作业 1 后更改作业 2 中的减速器数量?
还有,有没有办法让hadoop通过估计map输出自动推断出reducer的数量?它总是需要 1,而且我找不到更改默认设置的方法(除非我自己明确设置数字)。
// 1. create JobControl
JobControl jc = new JobControl(name);
// 2. add all the controlled jobs to the job control
// note that this is done in one go by using a collection
jc.addJobCollection(jobs);
// 3. execute the jobcontrol in a Thread
Thread workflowThread = new Thread(jc, "Thread_" + name);
workflowThread.setDaemon(true); // will not avoid JVM to shutdown
// 4. we wait for it to complete
LOG.info("Waiting for thread to complete: " + workflowThread.getName());
while (!jc.allFinished()) {
Thread.sleep(REFRESH_WAIT);
}
你的第一个问题。是的,您可以在驱动程序中执行作业 1 后设置作业 2 的减速器数量:
Job job1 = new Job(conf, "job 1");
//your job setup here
//...
job1.submit();
job1.waitForCompletion(true);
int job2Reducers = ... //compute based on job1 results here
Job job2 = new Job(conf, "job 2");
job2.setNumReduceTasks(job2Reducers);
//your job2 setup here
//...
job2.submit();
job2.waitForCompletion(true);
第二个问题,据我所知,不,你不能让 Hadoop 根据你的映射器负载自动选择 reducer 的数量。
地图的数量通常由输入文件中的 DFS 块数量决定。虽然这会导致人们调整他们的 DFS 块大小来调整地图的数量。
所以我们可以使用与 map 相同的逻辑来设置 reducer 任务的数量。 为了使 reducer 动态化,我编写了逻辑来动态设置 reducer 任务的数量,以在运行时根据 map 任务的数量进行调整。
在Java代码中:
long defaultBlockSize = 0;
int NumOfReduce = 10; // default value you can give any number.
long inputFileLength = 0;
try {
FileSystem fileSystem = FileSystem.get(this.getConf()); // hdfs file system
inputFileLength = fileSystem.getContentSummary(
new Path(PROP_HDFS_INPUT_LOCATION)).getLength();// input files stored in hdfs location
defaultBlockSize = fileSystem.getDefaultBlockSize(new Path(
hdfsFilePath.concat("PROP_HDFS_INPUT_LOCATION")));// getting default block size
if (inputFileLength > 0 && defaultBlockSize > 0) {
NumOfReduce = (int) (((inputFileLength / defaultBlockSize) + 1) * 2);// calculating number of tasks
}
System.out.println("NumOfReduce : " + NumOfReduce);
} catch (Exception e) {
LOGGER.error(" Exception{} ", e);
}
job.setNumReduceTasks(NumOfReduce);