气流涂胶作业

Airflow glue job

我希望能够在气流中而不是脚本中传递胶水参数。我正在尝试如下但它不起作用:

您分享的错误表明您是 运行 旧版本的亚马逊供应商。

要实现此功能,您必须 apache-airflow-providers-amazon>=2.3.0

用法示例:

from airflow.providers.amazon.aws.hooks.glue import GlueJobHook

some_run_kwargs = {"NumberOfWorkers": 5}
some_script_arguments = {"--var1": "value"}
glue_job_hook = GlueJobHook(
    job_name='aws_test_glue_job',
    desc='This is test case job from Airflow',
    iam_role_name='my_test_role',
    script_location="s3:/glue-examples/glue-scripts/sample_aws_glue_job.py",
    s3_bucket="my-includes",
    region_name="us-west-2",
)
glue_job_run = glue_job_hook.initialize_job(
    script_arguments=some_script_arguments,
    run_kwargs=some_run_kwargs
)

如果您正在使用 apache-airflow-providers-amazon<2.3.0,您可以通过向后移植 PR 中添加的代码来创建自定义挂钩:

from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook

class MyGlueJobHook(AwsGlueJobHook):
    
    def initialize_job(
        self,
        script_arguments: Optional[dict] = None,
        run_kwargs: Optional[dict] = None,
    ) -> Dict[str, str]:
        """
        Initializes connection with AWS Glue
        to run job
        :return:
        """
        glue_client = self.get_conn()
        script_arguments = script_arguments or {}
        run_kwargs = run_kwargs or {}

        try:
            job_name = self.get_or_create_glue_job()
            job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
            return job_run
        except Exception as general_error:
            self.log.error("Failed to run aws glue job, error: %s", general_error)
            raise

然后就可以像上面那样使用MyGlueJobHook