在 Airflow 上的自定义 pythonoperator 中访问 dag_run.conf

Access dag_run.conf in a custom pythonoperator on Airflow

我在 Airflow 上扩展了现有的 PythonOperator,如下所示:

class myPythonOperator(PythonOperator):
    def __init__(self,**kwargs) -> None:
        self.name = kwargs.get("name", "name is not provided")      

    def execute(self, context,**kwargs):
        print(self.name)
        super(myPythonOperator, self).execute(context)

我的任务定义为:

def task1(**kwargs):
    name = kwargs.get("name", "name is not provided")
    print(name)

以及以下 DAG:

    myTask = myPythonOperator(
    task_id='myTask',
    python_callable = task1,
    op_kwargs={"name": "{{ dag_run.conf['name'] }}"},
    provide_context=True
)

触发 DAG 时,我从 Airflow web UI 提供了一个配置 JSON,即 {"name":"foo"}

但问题是 JSON 中指定的名称只能从 task1 访问,在 ececute() 中它总是打印 name is not provided

有人知道从运算符的 __init__() 函数访问此 dag_run.conf 的技巧吗?

如有任何帮助,我们将不胜感激。谢谢

从继承的 class 访问 dag.run_config 的方法是使用 Airflow 中的 template_field,可以找到 here