Airflow 无法导入自定义 python 包

Airflow is unable to Import custom python package

我想通过来自自定义 python 项目的气流调用脚本

我的目录结构是:

/home/user/
      ├──airflow/
      │  ├──dags
             ├──.venv_airflow (virtual environment for airflow)
      │      ├──my_dag.py
      ├──my_project
         ├──.venv (virtual environment for my_project)
         ├──folderA
            ├──__init__.py
            ├──folderB
               ├──call_me.py (has a line "from my_project.folderA.folderB import import_me")
               ├──import_me.py

我的 dag 文件如下所示:

from airflow import DAG
import datetime as dt
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'arpita',
    'start_date': dt.datetime(2019, 11, 20),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
    'depends_on_past': False,
    'email': ['example@abc.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}

with DAG('sample',
         default_args=default_args,
         schedule_interval='30 * * * *',
         ) as dag:
    enter_project = BashOperator(task_id='enter_project',
                                 bash_command='cd /home/user/my_project',
                                 retries=2)
    setup_environment = BashOperator(task_id='setup_environment',
                                     bash_command='source /home/user/my_project/.venv/bin/activate',
                                     retries=2)
    call_script = BashOperator(task_id='call_script',
                                 bash_command='python -m my_project.folderA.folderB.call_me,
                                 retries=2)

enter_project >> setup_environment >> call_script

但是我收到这个错误

[2019-11-22 11:56:49,311] {bash_operator.py:115} INFO - Running command: python -m my_project.folderA.folderB.call_me
[2019-11-22 11:56:49,315] {bash_operator.py:124} INFO - Output:
[2019-11-22 11:56:49,349] {bash_operator.py:128} INFO - /home/user/airflow/.venv/bin/python: Error while finding spec for 'my_project.folderA.folderB.call_me' (ImportError: No module named 'my_project')

项目和脚本在气流之外工作。在气流中,它导入其他包,如 pandas 和 tensorflow,但不导入自定义包。我尝试使用 sys.path.insert 插入路径,但这不起作用。感谢您的阅读:)

您的 bash 在三个单独的 bash 运算符中命令 运行。它应该 运行 合而为一。

call_script = BashOperator(
    task_id='call_script',
    bash_command='cd /home/user/my_project;'
                 'source /home/user/my_project/.venv/bin/activate;'
                 'python -m my_project.folderA.folderB.call_me',
    retries=2)