如何在 DAG python 代码中使用 Airflow 模板参考
How I can use Airflow template reference in the DAG python code
我是 Airflow 世界的新手,我想了解一件事。例如,我有一个包含 2 个任务的 DAG。第一个任务是提交spark job,第二个任务是s3中等待文件的Sensor。
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
DATE = datetime.strptime(RUN_DATE_ARG, DATE_FORMAT_PY) - timedelta(hours=1)
with DAG() as dag:
submit_spark_job = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
execution_role_arn=JOB_ROLE_ARN,
release_label="emr-6.3.0-latest",
job_driver=JOB_DRIVER_ARG,
configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
name=f"spark-{RUN_DATE_ARG}",
retries=3
)
validate_s3_success_file = S3KeySensor(
task_id='check_for_success_file',
bucket_name="bucket-name",
bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
poke_interval=10,
timeout=60,
verify=False,
)
我有一个 RUN_DATE_ARG,默认情况下应该取自 datetime.utcnow()
,这是我应该为我的工作提供的火花 java 参数之一。
我想添加使用自定义日期参数提交作业的功能(通过气流 UI)。
当我尝试将其检索为 '{{ dag_run.conf["date"] | None}}'
时,它会替换为任务配置中的值 (bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
),但如果我执行以下操作,则不会替换为 DAG 的 python 代码:
date='{{ dag_run.conf["date"] | None}}'
if date is None:
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
else:
RUN_DATE_ARG = date
我有什么方法可以将这个值用作代码变量吗?
您不能在运算符范围之外使用模板。
您应该在运算符模板化参数中使用 Jinja if 语句。以下只是大概的思路:
submit_spark_job = EmrContainerOperator(
task_id="start_job",
...
name="spark-{{ dag_run.conf["date"] if dag_run.conf["date"] is not None else jinja_utc_now }}",
)
您需要将 jinja_utc_now
替换为检索时间戳的代码,可能类似于 答案中显示的内容。
您还可以使用:
{% if something %}
code
{% else %}
another code
{% endif %}
从 Airflow 的角度来看,它获取参数并将其传递给 Jinja 引擎以进行模板化,因此这里的关键问题只是使用正确的 Jinja 语法。
我是 Airflow 世界的新手,我想了解一件事。例如,我有一个包含 2 个任务的 DAG。第一个任务是提交spark job,第二个任务是s3中等待文件的Sensor。
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
DATE = datetime.strptime(RUN_DATE_ARG, DATE_FORMAT_PY) - timedelta(hours=1)
with DAG() as dag:
submit_spark_job = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
execution_role_arn=JOB_ROLE_ARN,
release_label="emr-6.3.0-latest",
job_driver=JOB_DRIVER_ARG,
configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
name=f"spark-{RUN_DATE_ARG}",
retries=3
)
validate_s3_success_file = S3KeySensor(
task_id='check_for_success_file',
bucket_name="bucket-name",
bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
poke_interval=10,
timeout=60,
verify=False,
)
我有一个 RUN_DATE_ARG,默认情况下应该取自 datetime.utcnow()
,这是我应该为我的工作提供的火花 java 参数之一。
我想添加使用自定义日期参数提交作业的功能(通过气流 UI)。
当我尝试将其检索为 '{{ dag_run.conf["date"] | None}}'
时,它会替换为任务配置中的值 (bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
),但如果我执行以下操作,则不会替换为 DAG 的 python 代码:
date='{{ dag_run.conf["date"] | None}}'
if date is None:
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
else:
RUN_DATE_ARG = date
我有什么方法可以将这个值用作代码变量吗?
您不能在运算符范围之外使用模板。
您应该在运算符模板化参数中使用 Jinja if 语句。以下只是大概的思路:
submit_spark_job = EmrContainerOperator(
task_id="start_job",
...
name="spark-{{ dag_run.conf["date"] if dag_run.conf["date"] is not None else jinja_utc_now }}",
)
您需要将 jinja_utc_now
替换为检索时间戳的代码,可能类似于
您还可以使用:
{% if something %}
code
{% else %}
another code
{% endif %}
从 Airflow 的角度来看,它获取参数并将其传递给 Jinja 引擎以进行模板化,因此这里的关键问题只是使用正确的 Jinja 语法。