在 Airflow 上的自定义 pythonoperator 中访问 dag_run.conf
Access dag_run.conf in a custom pythonoperator on Airflow
我在 Airflow 上扩展了现有的 PythonOperator,如下所示:
class myPythonOperator(PythonOperator):
def __init__(self,**kwargs) -> None:
self.name = kwargs.get("name", "name is not provided")
def execute(self, context,**kwargs):
print(self.name)
super(myPythonOperator, self).execute(context)
我的任务定义为:
def task1(**kwargs):
name = kwargs.get("name", "name is not provided")
print(name)
以及以下 DAG:
myTask = myPythonOperator(
task_id='myTask',
python_callable = task1,
op_kwargs={"name": "{{ dag_run.conf['name'] }}"},
provide_context=True
)
触发 DAG 时,我从 Airflow web UI 提供了一个配置 JSON,即 {"name":"foo"}
但问题是 JSON 中指定的名称只能从 task1 访问,在 ececute()
中它总是打印 name is not provided
有人知道从运算符的 __init__()
函数访问此 dag_run.conf
的技巧吗?
如有任何帮助,我们将不胜感激。谢谢
从继承的 class 访问 dag.run_config 的方法是使用 Airflow 中的 template_field,可以找到 here
我在 Airflow 上扩展了现有的 PythonOperator,如下所示:
class myPythonOperator(PythonOperator):
def __init__(self,**kwargs) -> None:
self.name = kwargs.get("name", "name is not provided")
def execute(self, context,**kwargs):
print(self.name)
super(myPythonOperator, self).execute(context)
我的任务定义为:
def task1(**kwargs):
name = kwargs.get("name", "name is not provided")
print(name)
以及以下 DAG:
myTask = myPythonOperator(
task_id='myTask',
python_callable = task1,
op_kwargs={"name": "{{ dag_run.conf['name'] }}"},
provide_context=True
)
触发 DAG 时,我从 Airflow web UI 提供了一个配置 JSON,即 {"name":"foo"}
但问题是 JSON 中指定的名称只能从 task1 访问,在 ececute()
中它总是打印 name is not provided
有人知道从运算符的 __init__()
函数访问此 dag_run.conf
的技巧吗?
如有任何帮助,我们将不胜感激。谢谢
从继承的 class 访问 dag.run_config 的方法是使用 Airflow 中的 template_field,可以找到 here