如何使用 Boto3 在 AWS EMR 集群中等待步骤完成

How to wait for a step completion in AWS EMR cluster using Boto3

给定一个步骤 ID,我想等待该 AWS EMR 步骤完成。我怎样才能做到这一点?有内置函数吗?

在撰写本文时,EMR 的 Boto3 服务员允许等待集群 运行 和集群终止事件:

EMR Waiters

Boto3没有内置函数。但是你可以自己写服务员。

参见:describe_step

cluster_idstep_id 调用 describe_step。响应是一个字典,其中包含有关该步骤的详细信息。其中一个键是 'State',它包含有关步骤状态的信息。如果状态不是 COMPLETED,请等待几秒钟再试,直到它是 COMPLETED 或等待时间超过您的限制。

'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED'

我想到了以下代码(如果您将 max_attempts 设置为 0 或更小,那么它将简单地等待,直到没有 running/pending 个步骤):

def wait_for_steps_completion(emr_client, emr_cluster_id, max_attempts=0):
    sleep_seconds = 30
    num_attempts = 0

    while True:
        response = emr_client.list_steps(
            ClusterId=emr_cluster_id,
            StepStates=['PENDING', 'CANCEL_PENDING', 'RUNNING']
        )
        num_attempts += 1
        active_aws_emr_steps = response['Steps']

        if active_aws_emr_steps:
            if 0 < max_attempts <= num_attempts:
                raise Exception(
                    'Max attempts exceeded while waiting for AWS EMR steps completion. Last response:\n'
                    + json.dumps(response, indent=3, default=str)
                )
            time.sleep(sleep_seconds)
        else:
            return

现在有一个服务员可用于步骤完成事件。它是在最近的 boto3 版本中添加的。

http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Waiter.StepComplete

示例代码:

import boto3

client = boto3.client("emr")
waiter = client.get_waiter("step_complete")
waiter.wait(
    ClusterId='the-cluster-id',
    StepId='the-step-id',
    WaiterConfig={
        "Delay": 30,
        "MaxAttempts": 10
    }
)

我在 GitHub 上编写了一个通用 status_poller 函数作为 EMR 交互式演示的一部分。

status_poller函数循环并调用一个函数,打印'.'或返回指定状态之前的新状态:

def status_poller(intro, done_status, func):
    """
    Polls a function for status, sleeping for 10 seconds between each query,
    until the specified status is returned.
    :param intro: An introductory sentence that informs the reader what we're
                  waiting for.
    :param done_status: The status we're waiting for. This function polls the status
                        function until it returns the specified status.
    :param func: The function to poll for status. This function must eventually
                 return the expected done_status or polling will continue indefinitely.
    """
    status = None
    print(intro)
    print("Current status: ", end='')
    while status != done_status:
        prev_status = status
        status = func()
        if prev_status == status:
            print('.', end='')
        else:
            print(status, end='')
        sys.stdout.flush()
        time.sleep(10)
    print()

要检查步骤是否完成,您可以这样称呼它:

status_poller(
    "Waiting for step to complete...",
    'COMPLETED',
    lambda:
    emr_basics.describe_step(cluster_id, step_id, emr_client)['Status']['State'])