等到 AWS Glue 爬虫完成 运行
Wait until AWS Glue crawler has finished running
在文档中,我找不到任何检查爬虫 运行 状态的方法。我目前这样做的唯一方法是不断检查 AWS 以检查 file/table 是否已创建。
是否有更好的方法来阻止直到爬虫完成其 运行?
您可以使用 boto3(或类似的)来完成。有 get_crawler 方法。您将在 "LastCrawl" 部分找到所需的信息
以下函数使用boto3
。它启动 AWS Glue 爬网程序并等待其完成。它还会记录进度的状态。它使用 Python v3.8 和 boto3 v1.17.3.
进行了测试
import logging
import time
import timeit
import boto3
log = logging.getLogger(__name__)
def run_crawler(crawler: str, *, timeout_minutes: int = 120, retry_seconds: int = 5) -> None:
"""Run the specified AWS Glue crawler, waiting until completion."""
# Ref:
timeout_seconds = timeout_minutes * 60
client = boto3.client("glue")
start_time = timeit.default_timer()
abort_time = start_time + timeout_seconds
def wait_until_ready() -> None:
state_previous = None
while True:
response_get = client.get_crawler(Name=crawler)
state = response_get["Crawler"]["State"]
if state != state_previous:
log.info(f"Crawler {crawler} is {state.lower()}.")
state_previous = state
if state == "READY": # Other known states: RUNNING, STOPPING
return
if timeit.default_timer() > abort_time:
raise TimeoutError(f"Failed to crawl {crawler}. The allocated time of {timeout_minutes:,} minutes has elapsed.")
time.sleep(retry_seconds)
wait_until_ready()
response_start = client.start_crawler(Name=crawler)
assert response_start["ResponseMetadata"]["HTTPStatusCode"] == 200
log.info(f"Crawling {crawler}.")
wait_until_ready()
log.info(f"Crawled {crawler}.")
可选奖励:使用一些合理的默认值创建或更新 AWS Glue 爬虫的函数:
def ensure_crawler(**kwargs: Any) -> None:
"""Ensure that the specified AWS Glue crawler exists with the given configuration.
At minimum the `Name` and `Targets` keyword arguments are required.
"""
# Use defaults
assert all(kwargs.get(k) for k in ("Name", "Targets"))
defaults = {
"Role": "AWSGlueRole",
"DatabaseName": kwargs["Name"],
"SchemaChangePolicy": {"UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DELETE_FROM_DATABASE"},
"RecrawlPolicy": {"RecrawlBehavior": "CRAWL_EVERYTHING"},
"LineageConfiguration": {"CrawlerLineageSettings": "DISABLE"},
}
kwargs = {**defaults, **kwargs}
# Ensure crawler
client = boto3.client("glue")
name = kwargs["Name"]
try:
response = client.create_crawler(**kwargs)
log.info(f"Created crawler {name}.")
except client.exceptions.AlreadyExistsException:
response = client.update_crawler(**kwargs)
log.info(f"Updated crawler {name}.")
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
根据最新的 AWS 文档,您还可以使用 custom waiter 创建状态爬虫服务员。下面显示了如何将其用于此目的的 MWE:
from enum import Enum
from customwaiter import CustomWaiter
import logging
import boto3
import botocore
logger = logging.getLogger(__name__)
class CrawlerState(Enum):
READY = "failure"
STOPPING = "success"
RUNNING = "running"
class CrawlerStateWaiter(CustomWaiter):
def __init__(self, client):
super().__init__(
"CrawlerState",
"get_crawler",
"Crawler.State",
{"STOPPING": CrawlerState.STOPPING, "READY": CrawlerState.READY},
client,
max_tries=100,
)
def wait(self, Name):
self._wait(Name=Name)
if __name__ == "__main__":
glue_client = boto3.client('glue')
response = glue_client.start_crawler(
Name=CrawlerName
)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
waiter = CrawlerStateWaiter(glue_client)
waiter.wait(Name=CrawlerName)
crawler_info = glue_client.get_crawler(
Name=CrawlerName
)
assert crawler_info["Crawler"]["LastCrawl"]["Status"] == "SUCCEEDED"
这里做了以下假设:
- 一旦爬虫是 运行,爬虫很快就会从
STOPPING
变为 READY
。
- 为了检查爬虫是否执行成功,我假设在完成此操作后立即可用。
在文档中,我找不到任何检查爬虫 运行 状态的方法。我目前这样做的唯一方法是不断检查 AWS 以检查 file/table 是否已创建。
是否有更好的方法来阻止直到爬虫完成其 运行?
您可以使用 boto3(或类似的)来完成。有 get_crawler 方法。您将在 "LastCrawl" 部分找到所需的信息
以下函数使用boto3
。它启动 AWS Glue 爬网程序并等待其完成。它还会记录进度的状态。它使用 Python v3.8 和 boto3 v1.17.3.
import logging
import time
import timeit
import boto3
log = logging.getLogger(__name__)
def run_crawler(crawler: str, *, timeout_minutes: int = 120, retry_seconds: int = 5) -> None:
"""Run the specified AWS Glue crawler, waiting until completion."""
# Ref:
timeout_seconds = timeout_minutes * 60
client = boto3.client("glue")
start_time = timeit.default_timer()
abort_time = start_time + timeout_seconds
def wait_until_ready() -> None:
state_previous = None
while True:
response_get = client.get_crawler(Name=crawler)
state = response_get["Crawler"]["State"]
if state != state_previous:
log.info(f"Crawler {crawler} is {state.lower()}.")
state_previous = state
if state == "READY": # Other known states: RUNNING, STOPPING
return
if timeit.default_timer() > abort_time:
raise TimeoutError(f"Failed to crawl {crawler}. The allocated time of {timeout_minutes:,} minutes has elapsed.")
time.sleep(retry_seconds)
wait_until_ready()
response_start = client.start_crawler(Name=crawler)
assert response_start["ResponseMetadata"]["HTTPStatusCode"] == 200
log.info(f"Crawling {crawler}.")
wait_until_ready()
log.info(f"Crawled {crawler}.")
可选奖励:使用一些合理的默认值创建或更新 AWS Glue 爬虫的函数:
def ensure_crawler(**kwargs: Any) -> None:
"""Ensure that the specified AWS Glue crawler exists with the given configuration.
At minimum the `Name` and `Targets` keyword arguments are required.
"""
# Use defaults
assert all(kwargs.get(k) for k in ("Name", "Targets"))
defaults = {
"Role": "AWSGlueRole",
"DatabaseName": kwargs["Name"],
"SchemaChangePolicy": {"UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DELETE_FROM_DATABASE"},
"RecrawlPolicy": {"RecrawlBehavior": "CRAWL_EVERYTHING"},
"LineageConfiguration": {"CrawlerLineageSettings": "DISABLE"},
}
kwargs = {**defaults, **kwargs}
# Ensure crawler
client = boto3.client("glue")
name = kwargs["Name"]
try:
response = client.create_crawler(**kwargs)
log.info(f"Created crawler {name}.")
except client.exceptions.AlreadyExistsException:
response = client.update_crawler(**kwargs)
log.info(f"Updated crawler {name}.")
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
根据最新的 AWS 文档,您还可以使用 custom waiter 创建状态爬虫服务员。下面显示了如何将其用于此目的的 MWE:
from enum import Enum
from customwaiter import CustomWaiter
import logging
import boto3
import botocore
logger = logging.getLogger(__name__)
class CrawlerState(Enum):
READY = "failure"
STOPPING = "success"
RUNNING = "running"
class CrawlerStateWaiter(CustomWaiter):
def __init__(self, client):
super().__init__(
"CrawlerState",
"get_crawler",
"Crawler.State",
{"STOPPING": CrawlerState.STOPPING, "READY": CrawlerState.READY},
client,
max_tries=100,
)
def wait(self, Name):
self._wait(Name=Name)
if __name__ == "__main__":
glue_client = boto3.client('glue')
response = glue_client.start_crawler(
Name=CrawlerName
)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
waiter = CrawlerStateWaiter(glue_client)
waiter.wait(Name=CrawlerName)
crawler_info = glue_client.get_crawler(
Name=CrawlerName
)
assert crawler_info["Crawler"]["LastCrawl"]["Status"] == "SUCCEEDED"
这里做了以下假设:
- 一旦爬虫是 运行,爬虫很快就会从
STOPPING
变为READY
。 - 为了检查爬虫是否执行成功,我假设在完成此操作后立即可用。