在 Airflow 2 [Cloud Composer] 中导入自定义插件

Importing custom plugins in Airflow 2 [Cloud Composer]

我有这样的目录结构:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── cse   
│       └── operators.py   
│           └── cse_to_bq.py   
└── test
   └── dags   
       └── dag_test.py  

在 Cloud Composer 创建的 GCS 存储桶中,有一个插件文件夹,我在其中上传 cse 文件夹。

如果我像这样导入插件,现在在我的 hk_dag.py 文件中:

from plugins.cse.operators.cse_to_bq import CSEToBQOperator

和 运行 我的单元测试,它通过了,但在云作曲家我收到 ModuleNotFoundError: No module named 'plugins' 错误消息。

如果我在 hk_dag.py 中导入这样的插件:

from cse.operators.cse_to_bq import CSEToBQOperator

我的单元测试因 ModuleNotFoundError: No module named 'cse' 而失败,但它在 Cloud Composer 中运行良好。

如何解决?

在 Airflow 2.0 中,要导入您的插件,您只需直接从 operators 模块导入即可。

在你的情况下,必须是这样的:

from operators.cse_to_bq import CSEToBQOperator

但在此之前,您必须将文件夹结构更改为:

airflow_dags
├── dags
│   └── hk  
│       └── hk_dag.py  
├── plugins
│   └── operators   
│       └── cse   
│           └── cse_to_bq.py 
└── test
   └── dags   
       └── dag_test.py