Yarn AppMaster 请求容器不工作

Yarn AppMaster request for containers not working

我是 运行 具有 8 个 vCore 和 8Gb 总内存的本地 Yarn 集群。

工作流程是这样的:

  1. YarnClient 提交应用请求,在容器中启动 AppMaster。

  2. AppMaster 启动,创建 amRMClient 和 nmClient,将自己注册到 RM,接下来它通过 amRMClient.addContainerRequest

  3. 为工作线程创建 4 个容器请求

即使有足够的资源可用容器也没有分配(永远不会调用回调的 onContainersAllocated 函数)。我尝试检查 nodemanager 和 resourcemanager 的日志,但没有看到任何与容器请求相关的行。我密切关注 apache 文档,无法理解我做错了什么。

AppMaster代码供参考:

@Override
public void run() {
    Map<String, String> envs = System.getenv();

    String containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.toString());
    if (containerIdString == null) {
        // container id should always be set in the env by the framework
        throw new IllegalArgumentException("ContainerId not set in the environment");
    }
    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();

    LOG.info("Starting AppMaster Client...");

    YarnAMRMCallbackHandler amHandler = new YarnAMRMCallbackHandler(allocatedYarnContainers);

    // TODO: get heart-beet interval from config instead of 100 default value
    amClient = AMRMClientAsync.createAMRMClientAsync(1000, this);
    amClient.init(config);
    amClient.start();

    LOG.info("Starting AppMaster Client OK");

    //YarnNMCallbackHandler nmHandler = new YarnNMCallbackHandler();
    containerManager = NMClient.createNMClient();
    containerManager.init(config);
    containerManager.start();

    // Get port, ulr information. TODO: get tracking url
    String appMasterHostname = NetUtils.getHostname();

    String appMasterTrackingUrl = "/progress";

    // Register self with ResourceManager. This will start heart-beating to the RM
    RegisterApplicationMasterResponse response = null;

    LOG.info("Register AppMaster on: " + appMasterHostname + "...");

    try {
        response = amClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl);
    } catch (YarnException | IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return;
    }

    LOG.info("Register AppMaster OK");

    // Dump out information about cluster capability as seen by the resource manager
    int maxMem = response.getMaximumResourceCapability().getMemory();
    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);

    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);

    containerMemory = Integer.parseInt(config.get(YarnConfig.YARN_CONTAINER_MEMORY_MB));
    containerCores = Integer.parseInt(config.get(YarnConfig.YARN_CONTAINER_CPU_CORES));

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

    if (containerCores > maxVCores) {
      LOG.info("Container virtual cores specified above max threshold of  cluster."
        + " Using max value." + ", specified=" + containerCores + ", max=" + maxVCores);
      containerCores = maxVCores;
    }
    List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts();
    LOG.info("Received " + previousAMRunningContainers.size()
            + " previous AM's running containers on AM registration.");


    for (int i = 0; i < 4; ++i) {
        ContainerRequest containerAsk = setupContainerAskForRM();
        amClient.addContainerRequest(containerAsk); // NOTHING HAPPENS HERE...
        LOG.info("Available resources: " + amClient.getAvailableResources().toString());
    }

    while(completedYarnContainers != 4) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    LOG.info("Done with allocation!");

}

@Override
public void onContainersAllocated(List<Container> containers) {
    LOG.info("Got response from RM for container ask, allocatedCnt=" + containers.size());

    for (Container container : containers) {
        LOG.info("Allocated yarn container with id: {}" + container.getId());
        allocatedYarnContainers.push(container);

        // TODO: Launch the container in a thread
    }
}

@Override
public void onError(Throwable error) {
    LOG.error(error.getMessage());
}

@Override
public float getProgress() {
    return (float) completedYarnContainers / allocatedYarnContainers.size();
}

这是 jps 的输出:

14594 NameNode
15269 DataNode
17975 Jps
14666 ResourceManager
14702 NodeManager

这是初始化和 4 个容器请求的 AppMaster 日志:

23:47:09 YarnAppMaster - Starting AppMaster Client OK
23:47:09 YarnAppMaster - Register AppMaster on: andrei-mbp.local/192.168.1.4...
23:47:09 YarnAppMaster - Register AppMaster OK
23:47:09 YarnAppMaster - Max mem capabililty of resources in this cluster 2048
23:47:09 YarnAppMaster - Max vcores capabililty of resources in this cluster 2
23:47:09 YarnAppMaster - Received 0 previous AM's running containers on AM registration.
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Requested container ask: Capability[<memory:512, vCores:1>]Priority[0]
23:47:11 YarnAppMaster - Available resources: <memory:7680, vCores:0>
23:47:11 YarnAppMaster - Progress indicator should not be negative

提前致谢。

感谢 Alexandre Fonseca 指出 getProgress() returns 在第一次分配之前调用时被零除的 NaN,这使得 ResourceManager 立即异常退出。

了解更多信息 here

我怀疑问题恰恰来自负面进展:

23:47:11 YarnAppMaster - Progress indicator should not be negative

请注意,由于您使用的是 AMRMAsyncClient,因此当您调用 addContainerRequest 时不会立即发出请求。一旦获得了 acquire 的响应,实际上就有一个 heartbeat function which is run periodically and it is in this function that allocate is called and the pending requests will be made. The progress value used by this function initially starts at 0 but is updated with the value returned by your handler

第一次获取应该在注册之后立即完成,因此应该调用 getProgress 函数并更新现有进度。实际上,您的进度将更新为 NaN,因为此时 allocatedYarnContainers 将为空,completedYarnContainers 也将为 0,因此您返回的进度将是未定义的 0/0 的结果。恰好 when the next allocate checks your progress value, it will fail because NaNs return false in all comparisons and so no other allocate function will actually communicate with the ResourceManager because it quits right at that first step with an exception.

尝试将进度函数更改为以下内容:

@Override
public float getProgress() {
    return (float) allocatedYarnContainers.size() / 4.0f;
}

(注意:从 here 复制到 Whosebug 以获得后验性)