如何使用 Boto3 在 AWS EMR 集群中等待步骤完成
How to wait for a step completion in AWS EMR cluster using Boto3
给定一个步骤 ID,我想等待该 AWS EMR 步骤完成。我怎样才能做到这一点?有内置函数吗?
在撰写本文时,EMR 的 Boto3 服务员允许等待集群 运行 和集群终止事件:
Boto3没有内置函数。但是你可以自己写服务员。
用 cluster_id
和 step_id
调用 describe_step
。响应是一个字典,其中包含有关该步骤的详细信息。其中一个键是 'State',它包含有关步骤状态的信息。如果状态不是 COMPLETED,请等待几秒钟再试,直到它是 COMPLETED 或等待时间超过您的限制。
'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED'
我想到了以下代码(如果您将 max_attempts
设置为 0 或更小,那么它将简单地等待,直到没有 running/pending 个步骤):
def wait_for_steps_completion(emr_client, emr_cluster_id, max_attempts=0):
sleep_seconds = 30
num_attempts = 0
while True:
response = emr_client.list_steps(
ClusterId=emr_cluster_id,
StepStates=['PENDING', 'CANCEL_PENDING', 'RUNNING']
)
num_attempts += 1
active_aws_emr_steps = response['Steps']
if active_aws_emr_steps:
if 0 < max_attempts <= num_attempts:
raise Exception(
'Max attempts exceeded while waiting for AWS EMR steps completion. Last response:\n'
+ json.dumps(response, indent=3, default=str)
)
time.sleep(sleep_seconds)
else:
return
现在有一个服务员可用于步骤完成事件。它是在最近的 boto3 版本中添加的。
http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Waiter.StepComplete
示例代码:
import boto3
client = boto3.client("emr")
waiter = client.get_waiter("step_complete")
waiter.wait(
ClusterId='the-cluster-id',
StepId='the-step-id',
WaiterConfig={
"Delay": 30,
"MaxAttempts": 10
}
)
我在 GitHub 上编写了一个通用 status_poller 函数作为 EMR 交互式演示的一部分。
status_poller函数循环并调用一个函数,打印'.'或返回指定状态之前的新状态:
def status_poller(intro, done_status, func):
"""
Polls a function for status, sleeping for 10 seconds between each query,
until the specified status is returned.
:param intro: An introductory sentence that informs the reader what we're
waiting for.
:param done_status: The status we're waiting for. This function polls the status
function until it returns the specified status.
:param func: The function to poll for status. This function must eventually
return the expected done_status or polling will continue indefinitely.
"""
status = None
print(intro)
print("Current status: ", end='')
while status != done_status:
prev_status = status
status = func()
if prev_status == status:
print('.', end='')
else:
print(status, end='')
sys.stdout.flush()
time.sleep(10)
print()
要检查步骤是否完成,您可以这样称呼它:
status_poller(
"Waiting for step to complete...",
'COMPLETED',
lambda:
emr_basics.describe_step(cluster_id, step_id, emr_client)['Status']['State'])
给定一个步骤 ID,我想等待该 AWS EMR 步骤完成。我怎样才能做到这一点?有内置函数吗?
在撰写本文时,EMR 的 Boto3 服务员允许等待集群 运行 和集群终止事件:
Boto3没有内置函数。但是你可以自己写服务员。
用 cluster_id
和 step_id
调用 describe_step
。响应是一个字典,其中包含有关该步骤的详细信息。其中一个键是 'State',它包含有关步骤状态的信息。如果状态不是 COMPLETED,请等待几秒钟再试,直到它是 COMPLETED 或等待时间超过您的限制。
'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED'
我想到了以下代码(如果您将 max_attempts
设置为 0 或更小,那么它将简单地等待,直到没有 running/pending 个步骤):
def wait_for_steps_completion(emr_client, emr_cluster_id, max_attempts=0):
sleep_seconds = 30
num_attempts = 0
while True:
response = emr_client.list_steps(
ClusterId=emr_cluster_id,
StepStates=['PENDING', 'CANCEL_PENDING', 'RUNNING']
)
num_attempts += 1
active_aws_emr_steps = response['Steps']
if active_aws_emr_steps:
if 0 < max_attempts <= num_attempts:
raise Exception(
'Max attempts exceeded while waiting for AWS EMR steps completion. Last response:\n'
+ json.dumps(response, indent=3, default=str)
)
time.sleep(sleep_seconds)
else:
return
现在有一个服务员可用于步骤完成事件。它是在最近的 boto3 版本中添加的。
http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Waiter.StepComplete
示例代码:
import boto3
client = boto3.client("emr")
waiter = client.get_waiter("step_complete")
waiter.wait(
ClusterId='the-cluster-id',
StepId='the-step-id',
WaiterConfig={
"Delay": 30,
"MaxAttempts": 10
}
)
我在 GitHub 上编写了一个通用 status_poller 函数作为 EMR 交互式演示的一部分。
status_poller函数循环并调用一个函数,打印'.'或返回指定状态之前的新状态:
def status_poller(intro, done_status, func):
"""
Polls a function for status, sleeping for 10 seconds between each query,
until the specified status is returned.
:param intro: An introductory sentence that informs the reader what we're
waiting for.
:param done_status: The status we're waiting for. This function polls the status
function until it returns the specified status.
:param func: The function to poll for status. This function must eventually
return the expected done_status or polling will continue indefinitely.
"""
status = None
print(intro)
print("Current status: ", end='')
while status != done_status:
prev_status = status
status = func()
if prev_status == status:
print('.', end='')
else:
print(status, end='')
sys.stdout.flush()
time.sleep(10)
print()
要检查步骤是否完成,您可以这样称呼它:
status_poller(
"Waiting for step to complete...",
'COMPLETED',
lambda:
emr_basics.describe_step(cluster_id, step_id, emr_client)['Status']['State'])