Airflow 如何设置 dag_run.conf 的默认值

Airflow how to set default values for dag_run.conf

我正在尝试设置一个 Airflow DAG,它提供 dag_run.conf 中可用的默认值。当 运行 来自 webUI 的 DAG,使用“运行 w/ Config”选项时,这非常有效。但是,当 运行 在计划中时, dag_run.conf 字典不存在,任务将失败,例如

jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'

下面是一个示例作业。

是否可以使 dag_run.conf 始终包含此处由 params 定义的字典?

from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta

def do_something(val1: str, val2: str) -> str:
    return f'echo vars are: "{val1}, {val2}"'

params = {
    'key1': 'def1',
    'key2': 'def2',        
}

default_args = {
    'retries': 0,
}

with DAG(
    'template_test',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1),
    start_date=hours_ago(1),
    params = params,
) as dag:

    hello_t = BashOperator(
        task_id='example-command',
        bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
        config=params,
    )

我见过的最接近的是 ,但是它们利用了 Jinja 和 if/else——这需要定义这些默认参数两次。我只想定义一次。

您可以使用 DAG params 来实现您想要的:

params (dict) – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.

您可以在 DAG 或任务级别定义 params,还可以从 带配置的触发器 DAG 部分的 UI 添加或修改它们。

示例 DAG:

default_args = {
    "owner": "airflow",
}

dag = DAG(
    dag_id="example_dag_params",
    default_args=default_args,
    schedule_interval="*/5 * * * *",
    start_date=days_ago(1),
    params={"param1": "first_param"},
    catchup=False,
)
with dag:

    bash_task = BashOperator(
        task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"
    )

输出日志:

[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01
[2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_dag_params
AIRFLOW_CTX_TASK_ID=bash_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00
[2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location: 
 /tmp
[2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param']
[2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:
[2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param
[2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0

从日志中,请注意 dag_run 计划 并且参数仍然存在。

您可以在 .

中找到有关使用参数的更广泛的示例

希望对你有用!