GCP Apache Airflow - 如何从私有存储库安装 Python 包并在 DAG 上导入?
GCP Apache Airflow - How to install Python package from a private repository and import on DAG?
我有一个私有存储库。这个存储库有我关于我的 DAG 的常用功能。 (例如:日期时间验证器、响应编码器函数)我想在我的 DAG 文件中导入这个存储库的函数,我使用 this link 来完成它。
我创建了 pip.conf
文件。这个文件的位置是:my-bucket-name/config/pip/pip.conf
我在这个文件中添加了我的私有 github 存储库,如下所示:
[global]
extra-index-url=https://<token>@github.com/my-private-github-repo.git
在此之后,我想在我的 dag 文件中导入这个存储库的函数(例如:from common-repo import *)但是我得到了 'module not found' 错误我的 DAG。 (不幸的是,在云作曲家日志中,我看不到任何显示私有 github 存储库已安装的日志。)
我搜索了很多,但找不到如何执行此操作。
您可以像这样在 PythonVirtualenvOperator 中将私有仓库添加到要求中:
from airflow import DAG
from airflow.decorators import task
@task.virtualenv(
task_id="virtualenv_python",
requirements=["https://<token>@github.com/my-private-github-repo.git"],
system_site_packages=False
)
def callable_from_virtualenv():
import your_private_module
..etc...
virtualenv_task = callable_from_virtualenv()
(示例来自 Airflow python operator example)
为了避免在源代码中硬编码令牌/凭证,您可以像这样使用 Airflow 变量:
from airflow.models import Variable
@task.virtualenv(
task_id="virtualenv_python",
requirements=[Variable.get("private_github_repo")],
system_site_packages=False
)
我有一个私有存储库。这个存储库有我关于我的 DAG 的常用功能。 (例如:日期时间验证器、响应编码器函数)我想在我的 DAG 文件中导入这个存储库的函数,我使用 this link 来完成它。
我创建了 pip.conf
文件。这个文件的位置是:my-bucket-name/config/pip/pip.conf
我在这个文件中添加了我的私有 github 存储库,如下所示:
[global]
extra-index-url=https://<token>@github.com/my-private-github-repo.git
在此之后,我想在我的 dag 文件中导入这个存储库的函数(例如:from common-repo import *)但是我得到了 'module not found' 错误我的 DAG。 (不幸的是,在云作曲家日志中,我看不到任何显示私有 github 存储库已安装的日志。)
我搜索了很多,但找不到如何执行此操作。
您可以像这样在 PythonVirtualenvOperator 中将私有仓库添加到要求中:
from airflow import DAG
from airflow.decorators import task
@task.virtualenv(
task_id="virtualenv_python",
requirements=["https://<token>@github.com/my-private-github-repo.git"],
system_site_packages=False
)
def callable_from_virtualenv():
import your_private_module
..etc...
virtualenv_task = callable_from_virtualenv()
(示例来自 Airflow python operator example)
为了避免在源代码中硬编码令牌/凭证,您可以像这样使用 Airflow 变量:
from airflow.models import Variable
@task.virtualenv(
task_id="virtualenv_python",
requirements=[Variable.get("private_github_repo")],
system_site_packages=False
)