调度气流任务组抛出AttributeError
Scheduling airflow TaskGroup throws AttributeError
所以我在 TaskGroup 中创建任务,并试图将它们添加到我的 dag 任务序列中,但它抛出了这个错误:
Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
self._set_relatives(task_or_task_list, upstream=False)
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'
我正在创建这样的任务组和任务:
def get_task_group(dag, task_group):
t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
with DAG('some_dag', default_args=args) as dag:
with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
run_model_task_group = get_task_group(dag, tg)
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
如果我删除任务组并通过删除行使任务组任务不排序
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
我可以看到 a1、a2、a3 和 a4 的顺序正确,我可以断开连接的 run_model_task_group
任务,但是一旦我将其添加到顺序中,我就会收到上述错误。
谁能指导我这里可能发生的事情?
请注意,我使用带有 dag
和 task_group
参数的函数来创建任务组任务,因为我也想为另一个 dag 创建相同的任务集。
Python Version: 3.8.8
Airflow Version: 2.0.1
AttributeError: 'NoneType' object has no attribute 'update_relative'
这是因为 run_model_task_group
它的 None
在 With
块的范围之外,这是预期的 Python 行为。
在不对您目前所做的做太多改动的情况下,您可以将 get_task_group()
重构为 return 一个 TaskGroup
对象,如下所示:
def get_task_group(dag, group_id):
with TaskGroup(group_id=group_id, dag=dag) as tg:
t1 = DummyOperator(task_id='t1', dag=dag)
t2 = DummyOperator(task_id='t2', dag=dag)
t3 = DummyOperator(task_id='t3', dag=dag)
t4 = DummyOperator(task_id='t4', dag=dag)
t5 = DummyOperator(task_id='t5', dag=dag)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
return tg
在 DAG 定义中简单地调用它:
run_model_task_group = get_task_group(dag, "run_model_tasks")
生成的图形视图如下所示:
DAG 定义:
with DAG('some_dag',
default_args=default_args,
start_date=days_ago(2),
schedule_interval='@once') as dag:
# with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
# run_model_task_group = get_task_group(dag, )
run_model_task_group = get_task_group(dag, "run_model_tasks")
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
最后,考虑使用按位运算符代替 set_downstream
和 set_upstream
,这是推荐的方式,而且也不那么冗长 source here。
如果这对你有用,请告诉我。
测试:气流版本:2.1.4,Python 3.8.10
所以我在 TaskGroup 中创建任务,并试图将它们添加到我的 dag 任务序列中,但它抛出了这个错误:
Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
self._set_relatives(task_or_task_list, upstream=False)
File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'
我正在创建这样的任务组和任务:
def get_task_group(dag, task_group):
t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
with DAG('some_dag', default_args=args) as dag:
with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
run_model_task_group = get_task_group(dag, tg)
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
如果我删除任务组并通过删除行使任务组任务不排序
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
我可以看到 a1、a2、a3 和 a4 的顺序正确,我可以断开连接的 run_model_task_group
任务,但是一旦我将其添加到顺序中,我就会收到上述错误。
谁能指导我这里可能发生的事情?
请注意,我使用带有 dag
和 task_group
参数的函数来创建任务组任务,因为我也想为另一个 dag 创建相同的任务集。
Python Version: 3.8.8
Airflow Version: 2.0.1
AttributeError: 'NoneType' object has no attribute 'update_relative'
这是因为 run_model_task_group
它的 None
在 With
块的范围之外,这是预期的 Python 行为。
在不对您目前所做的做太多改动的情况下,您可以将 get_task_group()
重构为 return 一个 TaskGroup
对象,如下所示:
def get_task_group(dag, group_id):
with TaskGroup(group_id=group_id, dag=dag) as tg:
t1 = DummyOperator(task_id='t1', dag=dag)
t2 = DummyOperator(task_id='t2', dag=dag)
t3 = DummyOperator(task_id='t3', dag=dag)
t4 = DummyOperator(task_id='t4', dag=dag)
t5 = DummyOperator(task_id='t5', dag=dag)
t_list = [t2, t3, t4]
t1.set_downstream(t_list)
t5.set_upstream(t_list)
return tg
在 DAG 定义中简单地调用它:
run_model_task_group = get_task_group(dag, "run_model_tasks")
生成的图形视图如下所示:
DAG 定义:
with DAG('some_dag',
default_args=default_args,
start_date=days_ago(2),
schedule_interval='@once') as dag:
# with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
# run_model_task_group = get_task_group(dag, )
run_model_task_group = get_task_group(dag, "run_model_tasks")
a1 = DummyOperator(task_id='a1', dag=dag)
a2 = DummyOperator(task_id='a2', dag=dag)
a3 = DummyOperator(task_id='a3', dag=dag)
a4 = DummyOperator(task_id='a4', dag=dag)
a1.set_downstream(a2)
a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)
a3.set_downstream(a4)
最后,考虑使用按位运算符代替 set_downstream
和 set_upstream
,这是推荐的方式,而且也不那么冗长 source here。
如果这对你有用,请告诉我。
测试:气流版本:2.1.4,Python 3.8.10