如何在 DAG python 代码中使用 Airflow 模板参考

How I can use Airflow template reference in the DAG python code

我是 Airflow 世界的新手,我想了解一件事。例如,我有一个包含 2 个任务的 DAG。第一个任务是提交spark job,第二个任务是s3中等待文件的Sensor。

RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
DATE = datetime.strptime(RUN_DATE_ARG, DATE_FORMAT_PY) - timedelta(hours=1)
with DAG() as dag:


submit_spark_job = EmrContainerOperator(
    task_id="start_job",
    virtual_cluster_id=VIRTUAL_CLUSTER_ID,
    execution_role_arn=JOB_ROLE_ARN,
    release_label="emr-6.3.0-latest",
    job_driver=JOB_DRIVER_ARG,
    configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
    name=f"spark-{RUN_DATE_ARG}",
    retries=3
)

validate_s3_success_file = S3KeySensor(
    task_id='check_for_success_file',
    bucket_name="bucket-name",
    bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
    poke_interval=10,
    timeout=60,
    verify=False,
)

我有一个 RUN_DATE_ARG,默认情况下应该取自 datetime.utcnow(),这是我应该为我的工作提供的火花 java 参数之一。 我想添加使用自定义日期参数提交作业的功能(通过气流 UI)。 当我尝试将其检索为 '{{ dag_run.conf["date"] | None}}' 时,它会替换为任务配置中的值 (bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",),但如果我执行以下操作,则不会替换为 DAG 的 python 代码:

date='{{ dag_run.conf["date"] | None}}'
if date is None:
  RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
else: 
  RUN_DATE_ARG = date

我有什么方法可以将这个值用作代码变量吗?

您不能在运算符范围之外使用模板。

您应该在运算符模板化参数中使用 Jinja if 语句。以下只是大概的思路:

submit_spark_job = EmrContainerOperator(
    task_id="start_job",
    ...
    name="spark-{{ dag_run.conf["date"] if dag_run.conf["date"] is not None else jinja_utc_now }}",

)

您需要将 jinja_utc_now 替换为检索时间戳的代码,可能类似于 答案中显示的内容。

您还可以使用:

{% if something %}
   code
{% else %}
   another code
{% endif %}

从 Airflow 的角度来看,它获取参数并将其传递给 Jinja 引擎以进行模板化,因此这里的关键问题只是使用正确的 Jinja 语法。