找不到 Airflow Composer 自定义模块 - PythonVirtualenvOperator

Airflow Composer custom module not found - PythonVirtualenvOperator

我在 GCP Composer 中设置了一个非常简单的 Airflow 实例。它有水桶和一切。我想使用 PythonVirtualenvOperator.

将每个 dag 设置为 运行 它自己的环境

里面的结构如下:

dags ->
------> code_snippets/
----------> print_name.py - has function called print_my_name() which prints a string into the terminal
------> test_dag.py

test_dag.py:

import datetime
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow import DAG


def main_func():
    import pandas as pd
    import datetime

    from code_snippets.print_name import print_my_name
    print_my_name()

    df = pd.DataFrame(data={
        'date': [str(datetime.datetime.now().date())]
    })

    print(df)

default_args = {
    'owner': 'test_dag',
    'start_date': datetime.datetime(2020, 7, 3, 5, 1, 00),
    'concurrency': 1,
    'retries': 0
}

dag = DAG('test_dag', description='Test DAGS with environment',
          schedule_interval='0 5 * * *',
          default_args=default_args, catchup=False)

test_the_dag = PythonVirtualenvOperator(
    task_id="test_dag",
    python_callable=main_func,
    python_version='3.8',
    requirements=["DateTime==4.3", "numpy==1.20.2", "pandas==1.2.4", "python-dateutil==2.8.1", "pytz==2021.1",
                  "six==1.15.0", "zope.interface==5.4.0"],
    system_site_packages=False,
    dag=dag,
)

test_the_dag

一切正常,直到我开始导入自定义模块 - 使用 init.py 没有帮助,它仍然给出相同的错误,在我的例子中是:

from code_snippets.print_name import print_my_name\nModuleNotFoundError: No module named \'code_snippets\'

我也有 Airflow 的本地实例,我遇到了同样的问题。我试过移动东西或将文件夹的路径添加到 PATH,在目录中添加 inits 甚至更改导入语句,但只要我正在导入自定义模块,错误就会一直存在。

system_site_packages=False 或 True 也没有影响

是否有解决该问题的方法或绕过它的方法,以便我可以利用我在 DAG 之外分离的自定义代码?

气流版本:1.10.14+作曲家 Python Airflow 版本设置为:3

airflow.operators.python.PythonVirtualenvOperator 的实现是 python_callable 预计不会引用外部名称。

可调用文件中使用的任何非标准库包都必须在 requirements.txt 文件中声明为外部依赖项。

如果您需要使用 code_snippets,请将其作为包发布到 pypi 或 VCS 存储库,并将其添加到 [=15= 的 requirements kwargs 的包列表中].