MWAA Airflow 2.2.2 'DAG' 对象没有属性 'update_relative'
MWAA Airflow 2.2.2 'DAG' object has no attribute 'update_relative'
所以我将 DAG 从 airflow 版本 1.12.15 升级到 2.2.2,并将 python 从 3.8 降级到 3.7(因为 MWAA 不支持 python 3.8)。 DAG 在之前的设置中运行良好,但在 MWAA 设置中显示此错误:
Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
这是似乎失败的内置函数:
def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Set a task or a task list to be directly downstream from the current
task. Required by TaskMixin.
"""
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
DAG 中有我们正在尝试 运行 的代码:
for report in reports:
dag << PythonOperator(
task_id=f"task_{report}",
python_callable=process,
op_kwargs={
"conn": "snowflake_production",
"table": report,
},
provide_context=True,
)
我认为从 Python 3.8 到 3.7 的过渡是造成这个问题的原因,但我不确定。
有人 运行 遇到过类似问题吗?
对于 Airflow>=2.0.0,不再支持使用移位 (bit-shift) 运算符将任务分配给 DAG。
正在尝试做:
dag = DAG("my_dag")
dummy = DummyOperator(task_id="dummy")
dag >> dummy
不会工作。
只能在运算符之间设置依赖关系。
您应该使用上下文管理器:
with DAG("my_dag") as dag:
dummy = DummyOperator(task_id="dummy")
它已经处理了运算符与 DAG 对象的关系。
如果您不想这样做,请在运算符构造函数中使用 dag 参数:DummyOperator(task_id="dummy", dag=dag)
所以我将 DAG 从 airflow 版本 1.12.15 升级到 2.2.2,并将 python 从 3.8 降级到 3.7(因为 MWAA 不支持 python 3.8)。 DAG 在之前的设置中运行良好,但在 MWAA 设置中显示此错误:
Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
这是似乎失败的内置函数:
def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Set a task or a task list to be directly downstream from the current
task. Required by TaskMixin.
"""
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
DAG 中有我们正在尝试 运行 的代码:
for report in reports:
dag << PythonOperator(
task_id=f"task_{report}",
python_callable=process,
op_kwargs={
"conn": "snowflake_production",
"table": report,
},
provide_context=True,
)
我认为从 Python 3.8 到 3.7 的过渡是造成这个问题的原因,但我不确定。
有人 运行 遇到过类似问题吗?
对于 Airflow>=2.0.0,不再支持使用移位 (bit-shift) 运算符将任务分配给 DAG。
正在尝试做:
dag = DAG("my_dag")
dummy = DummyOperator(task_id="dummy")
dag >> dummy
不会工作。
只能在运算符之间设置依赖关系。
您应该使用上下文管理器:
with DAG("my_dag") as dag:
dummy = DummyOperator(task_id="dummy")
它已经处理了运算符与 DAG 对象的关系。
如果您不想这样做,请在运算符构造函数中使用 dag 参数:DummyOperator(task_id="dummy", dag=dag)