气流涂胶作业
Airflow glue job
我希望能够在气流中而不是脚本中传递胶水参数。我正在尝试如下但它不起作用:
您分享的错误表明您是 运行 旧版本的亚马逊供应商。
要实现此功能,您必须 apache-airflow-providers-amazon>=2.3.0
。
用法示例:
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
some_run_kwargs = {"NumberOfWorkers": 5}
some_script_arguments = {"--var1": "value"}
glue_job_hook = GlueJobHook(
job_name='aws_test_glue_job',
desc='This is test case job from Airflow',
iam_role_name='my_test_role',
script_location="s3:/glue-examples/glue-scripts/sample_aws_glue_job.py",
s3_bucket="my-includes",
region_name="us-west-2",
)
glue_job_run = glue_job_hook.initialize_job(
script_arguments=some_script_arguments,
run_kwargs=some_run_kwargs
)
如果您正在使用 apache-airflow-providers-amazon<2.3.0
,您可以通过向后移植 PR 中添加的代码来创建自定义挂钩:
from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
class MyGlueJobHook(AwsGlueJobHook):
def initialize_job(
self,
script_arguments: Optional[dict] = None,
run_kwargs: Optional[dict] = None,
) -> Dict[str, str]:
"""
Initializes connection with AWS Glue
to run job
:return:
"""
glue_client = self.get_conn()
script_arguments = script_arguments or {}
run_kwargs = run_kwargs or {}
try:
job_name = self.get_or_create_glue_job()
job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
return job_run
except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s", general_error)
raise
然后就可以像上面那样使用MyGlueJobHook
了
我希望能够在气流中而不是脚本中传递胶水参数。我正在尝试如下但它不起作用:
您分享的错误表明您是 运行 旧版本的亚马逊供应商。
要实现此功能,您必须 apache-airflow-providers-amazon>=2.3.0
。
用法示例:
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
some_run_kwargs = {"NumberOfWorkers": 5}
some_script_arguments = {"--var1": "value"}
glue_job_hook = GlueJobHook(
job_name='aws_test_glue_job',
desc='This is test case job from Airflow',
iam_role_name='my_test_role',
script_location="s3:/glue-examples/glue-scripts/sample_aws_glue_job.py",
s3_bucket="my-includes",
region_name="us-west-2",
)
glue_job_run = glue_job_hook.initialize_job(
script_arguments=some_script_arguments,
run_kwargs=some_run_kwargs
)
如果您正在使用 apache-airflow-providers-amazon<2.3.0
,您可以通过向后移植 PR 中添加的代码来创建自定义挂钩:
from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
class MyGlueJobHook(AwsGlueJobHook):
def initialize_job(
self,
script_arguments: Optional[dict] = None,
run_kwargs: Optional[dict] = None,
) -> Dict[str, str]:
"""
Initializes connection with AWS Glue
to run job
:return:
"""
glue_client = self.get_conn()
script_arguments = script_arguments or {}
run_kwargs = run_kwargs or {}
try:
job_name = self.get_or_create_glue_job()
job_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
return job_run
except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s", general_error)
raise
然后就可以像上面那样使用MyGlueJobHook
了