当 运行 Google 的 Cloud Compose 时,气流 dag 依赖项对 dag 不可用

Airflow dag dependencies not available to dags when running Google's Cloud Compose

A​​irflow 允许您将 dag 依赖的依赖项(外部 python 代码添加到 dag 代码中)放在 dag 文件夹中。这意味着那些外部 python 代码中的任何 components/members 或 类 都可用于 dag 代码。

然而,在执行此操作时(在云组合环境的 GCS dag 文件夹中),依赖项的组件对 dag 不可用。 Airflow Web UI 中显示类似于以下内容的错误:Broken DAG: [/home/airflow/gcs/dags/....py] No module named tester.其中 tester 是 dags 文件夹中的一个单独的 python 文件。

当使用 Google 的 SDK(运行 实际的 Airflow 命令)测试这些任务时,任务 运行 很好,但它似乎在 Kubernettes 的某个地方创建了这些容器作业,它确实似乎也没有接管依赖项。

我知道 Cloud Compose 处于测试阶段,但我想知道我是否做错了什么。

来自关于配置 Airflow 的官方文档:

The first time you run Airflow, it will create a file called airflow.cfg in your $AIRFLOW_HOME directory (~/airflow by default). This file contains Airflow’s configuration and you can edit it to change any of the settings

在这个文件中设置第一个设置

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /home/airflow/gcs/dags

Airflow 的基本路径。

您是否正在寻找如何安装 Python 依赖项? https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

此外,位于您的 GCS 存储桶中的 DAGs 文件夹(gcloud beta composer environments describe [environment] 获取此存储桶;gs://{composer-bucket}/dags)应映射到您的 /home/airflow/gcs/dags =18=]。您是否尝试通过 SSH 连接到节点来找到它?

您应该将模块放在包含 __init__.py 文件的单独文件夹中(Airflow 不喜欢其顶级 DAGs 目录中的 __init__.py 文件)。

例如,如果您有以下目录结构:

dags/
    my_dag.py
    my_deps/
        __init__.py
        dep_a.py
        dep_b.py

你可以在my_dag.py中写from my_deps import dep_a, dep_b

我遇到了同样的问题,并在邮件列表中帮助解决了它。作为参考,请参阅此处的线程:https://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY。有一个 link 到一个方便的 Github 要点,其中也有一些评论。

为了编写您自己的依赖项并将其导入到您的 DAG 中,您需要按照此处所述压缩您的 dag 及其依赖项:https://airflow.apache.org/concepts.html?highlight=zip#packaged-dags

您可以将该 zip 文件直接上传到您的 Cloud Composer GCS 存储桶,Airflow 会提取它。

确保您的依赖项是包,而不是模块,位于 dags 目录的顶层。

from foo_dep.foo_dep import my_utility_function 将在这里工作:

foo_dag.py
foo_dep/__init__.py
foo_dep/foo_dep.py

from foo_dep import my_utility_function 似乎它应该与以下 dags 目录结构一起工作(并且将在本地工作),但它在 Airflow 中不起作用:

foo_dag.py
foo_dep.py