MWAA - 气流 - PythonVirtualenvOperator 需要 virtualenv

MWAA - Airflow - PythonVirtualenvOperator requires virtualenv

我正在使用 AWS 的 MWAA service (2.2.2) 到 运行 各种 DAG,其中大部分是使用标准 PythonOperator 类型实现的。我将 DAG 与任何共享需求一起捆绑到 S3 存储桶中,然后将 MWAA 指向相关对象和版本。到目前为止一切运行都很顺利。

我现在想使用 PythonVirtualenvOperator type, which AWS acknowledge is not supported out of the box. I am following their guide 实现 DAG,了解如何使用自定义插件修补行为,但继续收到来自 Airflow 的错误,在仪表板顶部以大红色显示写作:

DAG Import Errors (1) ... ... AirflowException: PythonVirtualenvOperator requires virtualenv, please install it.

我已经确认该插件确实被 Airflow 选中(我在管理屏幕中看到它被引用),为了避免疑义,我在他们的示例中使用了 AWS 提供的确切代码有向无环图。 AWS 关于此的文档非常简单,我还没有偶然发现任何社区对此的讨论。

根据 AWS 的文档,我们希望插件在启动时 运行 在处理任何 DAG 之前。插件本身似乎有效地重写了 venv 命令以使用 pip 安装的版本,而不是安装在机器上的版本,但是我一直在努力验证事情是否按照我期望的顺序发生。非常感谢任何有关调试实例行为的指示。

有没有人遇到过类似的问题? MWAA 文档中是否存在需要解决的漏洞?我是否漏掉了一些非常明显的东西?

可能相关,但我确实在调度程序的日志中看到了这个警告,这可能表明为什么 MWAA 正在努力解决依赖关系?

WARNING: The script virtualenv is installed in '/usr/local/airflow/.local/bin' which is not on PATH.

Airflow 使用 shutil.which 寻找 virtualenv。通过 requirements.txt 安装的 virtualenv 不在路径上。将 virtualenv 的路径添加到 PATH 可以解决这个问题。 这里的文档是错误的https://docs.aws.amazon.com/mwaa/latest/userguide/samples-virtualenv.html

import os
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv 
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
    cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
    if system_site_packages:
        cmd.append('--system-site-packages')
    if python_bin is not None:
        cmd.append(f'--python={python_bin}')
    return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
#This is the added path code
os.environ["PATH"] = f"/usr/local/airflow/.local/bin:{os.environ['PATH']}"
class VirtualPythonPlugin(AirflowPlugin):                
    name = 'virtual_python_plugin'