调度气流任务组抛出AttributeError

Scheduling airflow TaskGroup throws AttributeError

所以我在 TaskGroup 中创建任务,并试图将它们添加到我的 dag 任务序列中,但它抛出了这个错误:

Broken DAG: [/Users/abc/projects/abc/airflow_dags/dag.py] Traceback (most recent call last):
  File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1234, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False)
  File "/Users/abc/.pyenv/versions/3.8.12/envs/vmd-3.8.12/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'NoneType' object has no attribute 'update_relative'

我正在创建这样的任务组和任务:

def get_task_group(dag, task_group):
    t1 = DummyOperator(task_id='t1', dag=dag, task_group=task_group)
    t2 = DummyOperator(task_id='t2', dag=dag, task_group=task_group)
    t3 = DummyOperator(task_id='t3', dag=dag, task_group=task_group)
    t4 = DummyOperator(task_id='t4', dag=dag, task_group=task_group)
    t5 = DummyOperator(task_id='t5', dag=dag, task_group=task_group)
    t_list = [t2, t3, t4]
    t1.set_downstream(t_list)
    t5.set_upstream(t_list)


with DAG('some_dag', default_args=args) as dag:
    with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
      run_model_task_group = get_task_group(dag, tg)

    a1 = DummyOperator(task_id='a1', dag=dag)
    a2 = DummyOperator(task_id='a2', dag=dag)
    a3 = DummyOperator(task_id='a3', dag=dag)
    a4 = DummyOperator(task_id='a4', dag=dag)

    a1.set_downstream(a2)
    a2.set_downstream(run_model_task_group)
    a3.set_upstream(run_model_task_group)
    a3.set_downstream(a4)

如果我删除任务组并通过删除行使任务组任务不排序

a2.set_downstream(run_model_task_group)
a3.set_upstream(run_model_task_group)

我可以看到 a1、a2、a3 和 a4 的顺序正确,我可以断开连接的 run_model_task_group 任务,但是一旦我将其添加到顺序中,我就会收到上述错误。

谁能指导我这里可能发生的事情?

请注意,我使用带有 dagtask_group 参数的函数来创建任务组任务,因为我也想为另一个 dag 创建相同的任务集。

Python Version: 3.8.8
Airflow Version: 2.0.1

AttributeError: 'NoneType' object has no attribute 'update_relative'这是因为 run_model_task_group 它的 NoneWith 块的范围之外,这是预期的 Python 行为。

在不对您目前所做的做太多改动的情况下,您可以将 get_task_group() 重构为 return 一个 TaskGroup 对象,如下所示:

def get_task_group(dag, group_id):

    with TaskGroup(group_id=group_id, dag=dag) as tg:
        t1 = DummyOperator(task_id='t1', dag=dag)
        t2 = DummyOperator(task_id='t2', dag=dag)
        t3 = DummyOperator(task_id='t3', dag=dag)
        t4 = DummyOperator(task_id='t4', dag=dag)
        t5 = DummyOperator(task_id='t5', dag=dag)
        t_list = [t2, t3, t4]
        t1.set_downstream(t_list)
        t5.set_upstream(t_list)
    return tg

在 DAG 定义中简单地调用它:

run_model_task_group = get_task_group(dag, "run_model_tasks")

生成的图形视图如下所示:

DAG 定义:

with DAG('some_dag',
         default_args=default_args,
         start_date=days_ago(2),
         schedule_interval='@once') as dag:
    # with TaskGroup(group_id=f"run_model_tasks", dag=dag) as tg:
    #     run_model_task_group = get_task_group(dag, )
    run_model_task_group = get_task_group(dag, "run_model_tasks")
    a1 = DummyOperator(task_id='a1', dag=dag)
    a2 = DummyOperator(task_id='a2', dag=dag)
    a3 = DummyOperator(task_id='a3', dag=dag)
    a4 = DummyOperator(task_id='a4', dag=dag)

    a1.set_downstream(a2)
    a2.set_downstream(run_model_task_group)
    a3.set_upstream(run_model_task_group)
    a3.set_downstream(a4)

最后,考虑使用按位运算符代替 set_downstreamset_upstream,这是推荐的方式,而且也不那么冗长 source here

如果这对你有用,请告诉我。

测试:气流版本:2.1.4,Python 3.8.10