等到 AWS Glue 爬虫完成 运行

Wait until AWS Glue crawler has finished running

在文档中,我找不到任何检查爬虫 运行 状态的方法。我目前这样做的唯一方法是不断检查 AWS 以检查 file/table 是否已创建。

是否有更好的方法来阻止直到爬虫完成其 运行?

您可以使用 boto3(或类似的)来完成。有 get_crawler 方法。您将在 "LastCrawl" 部分找到所需的信息

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_crawler

以下函数使用boto3。它启动 AWS Glue 爬网程序并等待其完成。它还会记录进度的状态。它使用 Python v3.8 和 boto3 v1.17.3.

进行了测试
import logging
import time
import timeit

import boto3

log = logging.getLogger(__name__)


def run_crawler(crawler: str, *, timeout_minutes: int = 120, retry_seconds: int = 5) -> None:
    """Run the specified AWS Glue crawler, waiting until completion."""
    # Ref: 
    timeout_seconds = timeout_minutes * 60
    client = boto3.client("glue")
    start_time = timeit.default_timer()
    abort_time = start_time + timeout_seconds

    def wait_until_ready() -> None:
        state_previous = None
        while True:
            response_get = client.get_crawler(Name=crawler)
            state = response_get["Crawler"]["State"]
            if state != state_previous:
                log.info(f"Crawler {crawler} is {state.lower()}.")
                state_previous = state
            if state == "READY":  # Other known states: RUNNING, STOPPING
                return
            if timeit.default_timer() > abort_time:
                raise TimeoutError(f"Failed to crawl {crawler}. The allocated time of {timeout_minutes:,} minutes has elapsed.")
            time.sleep(retry_seconds)

    wait_until_ready()
    response_start = client.start_crawler(Name=crawler)
    assert response_start["ResponseMetadata"]["HTTPStatusCode"] == 200
    log.info(f"Crawling {crawler}.")
    wait_until_ready()
    log.info(f"Crawled {crawler}.")

可选奖励:使用一些合理的默认值创建或更新 AWS Glue 爬虫的函数:

def ensure_crawler(**kwargs: Any) -> None:
    """Ensure that the specified AWS Glue crawler exists with the given configuration.

    At minimum the `Name` and `Targets` keyword arguments are required.
    """
    # Use defaults
    assert all(kwargs.get(k) for k in ("Name", "Targets"))
    defaults = {
        "Role": "AWSGlueRole",
        "DatabaseName": kwargs["Name"],
        "SchemaChangePolicy": {"UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DELETE_FROM_DATABASE"},
        "RecrawlPolicy": {"RecrawlBehavior": "CRAWL_EVERYTHING"},
        "LineageConfiguration": {"CrawlerLineageSettings": "DISABLE"},
    }
    kwargs = {**defaults, **kwargs}

    # Ensure crawler
    client = boto3.client("glue")
    name = kwargs["Name"]
    try:
        response = client.create_crawler(**kwargs)
        log.info(f"Created crawler {name}.")
    except client.exceptions.AlreadyExistsException:
        response = client.update_crawler(**kwargs)
        log.info(f"Updated crawler {name}.")
    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200

根据最新的 AWS 文档,您还可以使用 custom waiter 创建状态爬虫服务员。下面显示了如何将其用于此目的的 MWE:

from enum import Enum
from customwaiter import CustomWaiter
import logging
import boto3
import botocore

logger = logging.getLogger(__name__)

class CrawlerState(Enum):
    READY = "failure"
    STOPPING = "success"
    RUNNING = "running"
    
class CrawlerStateWaiter(CustomWaiter):
    def __init__(self, client):
        super().__init__(
            "CrawlerState",
            "get_crawler",
            "Crawler.State",
            {"STOPPING": CrawlerState.STOPPING, "READY": CrawlerState.READY},
            client,
            max_tries=100,
        )

    def wait(self, Name):
        self._wait(Name=Name)

if __name__ == "__main__":
    glue_client = boto3.client('glue')
    response = glue_client.start_crawler(
        Name=CrawlerName
    )
    assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
    waiter = CrawlerStateWaiter(glue_client)
    waiter.wait(Name=CrawlerName)
    crawler_info = glue_client.get_crawler(
        Name=CrawlerName
    )
    assert crawler_info["Crawler"]["LastCrawl"]["Status"] == "SUCCEEDED"

这里做了以下假设:

  1. 一旦爬虫是 运行,爬虫很快就会从 STOPPING 变为 READY
  2. 为了检查爬虫是否执行成功,我假设在完成此操作后立即可用。