Hadoop:在运行时更改减速器的数量

Hadoop: Change number of reducers at runtime

假设以下场景:一组依赖作业,它们被发送到 hadoop。 Hadoop 执行第一个,然后执行依赖于第一个的第二个,依此类推。作业使用 JobControl 一次性提交(参见下面的代码)。

使用 Hadoop 2.x(在 Java 中),是否可以在运行时更改作业的 reducer 数量?更具体地说,如何在执行作业 1 后更改作业 2 中的减速器数量?

还有,有没有办法让hadoop通过估计map输出自动推断出reducer的数量?它总是需要 1,而且我找不到更改默认设置的方法(除非我自己明确设置数字)。

// 1. create JobControl
JobControl jc = new JobControl(name);

// 2. add all the controlled jobs to the job control
// note that this is done in one go by using a collection
jc.addJobCollection(jobs);

// 3. execute the jobcontrol in a Thread
Thread workflowThread = new Thread(jc, "Thread_" + name);
workflowThread.setDaemon(true); // will not avoid JVM to shutdown

// 4. we wait for it to complete
LOG.info("Waiting for thread to complete: " + workflowThread.getName());
while (!jc.allFinished()) {
    Thread.sleep(REFRESH_WAIT);
}

你的第一个问题。是的,您可以在驱动程序中执行作业 1 后设置作业 2 的减速器数量:

Job job1 = new Job(conf, "job 1");
//your job setup here
//...
job1.submit();
job1.waitForCompletion(true);

int job2Reducers = ... //compute based on job1 results here

Job job2 = new Job(conf, "job 2");
job2.setNumReduceTasks(job2Reducers);
//your job2 setup here
//...
job2.submit();
job2.waitForCompletion(true);

第二个问题,据我所知,不,你不能让 Hadoop 根据你的映射器负载自动选择 reducer 的数量。

地图的数量通常由输入文件中的 DFS 块数量决定。虽然这会导致人们调整他们的 DFS 块大小来调整地图的数量。

所以我们可以使用与 map 相同的逻辑来设置 reducer 任务的数量。 为了使 reducer 动态化,我编写了逻辑来动态设置 reducer 任务的数量,以在运行时根据 map 任务的数量进行调整。

在Java代码中:

    long defaultBlockSize = 0;
    int NumOfReduce = 10; // default value you can give any number.
    long inputFileLength = 0;
    try {
        FileSystem fileSystem = FileSystem.get(this.getConf()); // hdfs file system
        inputFileLength = fileSystem.getContentSummary(
                new Path(PROP_HDFS_INPUT_LOCATION)).getLength();// input files stored in hdfs location

        defaultBlockSize = fileSystem.getDefaultBlockSize(new Path(
                hdfsFilePath.concat("PROP_HDFS_INPUT_LOCATION")));// getting default block size

        if (inputFileLength > 0 && defaultBlockSize > 0) {
            NumOfReduce = (int) (((inputFileLength / defaultBlockSize) + 1) * 2);// calculating number of tasks
        }
        System.out.println("NumOfReduce : " + NumOfReduce);
    } catch (Exception e) {
        LOGGER.error(" Exception{} ", e);
    }

    job.setNumReduceTasks(NumOfReduce);