Google Cloud Composer "seems to be missing" 中的 Airflow DAG 似乎是因为调用了 Google Cloud Storage

Airflow DAG in Google Cloud Composer "seems to be missing" seemingly because of call to Google Cloud Storage

以前的问题

此问题已在 here, here, and here 之前报告过,但是,我怀疑这可能是因为调用了 google 云存储。

Premise/Problem

以下代码放置在 Google Cloud Composer 实例的 DAG 文件夹中。

以下代码块基于字符串列表“动态”生成 DAG,将成功运行,生成 2 个 DAG,名称为:“new_dummy_ohhel " 和 "new_dummy_hello"。这些 DAG 是可访问的并且可以工作。

import datetime as dt

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

from google.cloud.storage import Client as Client_Storage


list_strings = ["ohhello", "hellothere"]
# ^ Variable of importance

for string_val in list_strings:

    name_dag = f"new_dummy_{string_val[:5]}"
    # Just get the first 5 characters (this is important later)

    dag = DAG(
        name_dag,
        default_args={
            "owner": "airflow",
            "start_date": dt.datetime.now() - dt.timedelta(days=1),
            # ^ yesterday
            "depends_on_past": False,
            "email": [""],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": dt.timedelta(minutes=5),
        },
        schedule_interval=None,
    )

    with dag:
        op_dummy_start = DummyOperator(task_id="new-dummy-task-start")
        op_dummy_end = DummyOperator(task_id="new-dummy-task-end")

        op_dummy_start >> op_dummy_end

    globals()[name_dag] = dag

现在,有问题且奇怪的是,当对 Google Cloud Storage 的简单调用替换字符串列表时,DAG 仍会创建,但在尝试访问它们。如果将变量 list_strings 替换为以下内容,则会出现此错误。

list_strings = [
    x.name
    for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]

假设我在存储桶“some_bucket_name”中有文件“something.json”和“omgwhy.json”,那么将创建两个不可访问且不可执行的 DAG:“new_dummy_somet”和“new_dummy_omgwh”。 (只获得了前五 (5) 个字母,因此未包含 .。)这表明调用存储成功,但 DAG 仍然“似乎丢失”。

即使代码像下面这样立即覆盖了那个列表,DAG“似乎丢失”的错误仍然会出现(注意DAGs将是“new_dummy_ohhel”和“new_dummy_hello"):

list_strings = [
    x.name
    for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
list_strings = ["ohhello", "hellothere"]

tl;dr & 假设

当对Google云存储进行任何调用(包括成功的和未使用的)时,文件中创建的所有DAG都会成功出现,但会提示DAG "<whatever dag name here>" seems to be missing.我傻眼了至于为什么成功调用会导致这个问题。

尝试过的解决方案

Cloud Composer 中的托管 Web 服务器在与您环境的主要工作计算机(使用在环境创建期间指定的服务帐户)不同的服务帐户下运行。这意味着如果您打算从环境的主存储桶以外的存储桶中读取数据,则需要使用类似的 ACL,否则 Web 服务器将无法从存储桶中读取数据。

在您的情况下,Airflow 调度程序可能可以读取存储桶(使用环境的服务帐户),但 Web 服务器不能。调度程序将为 DAG 创建一个条目,但如果 Web 服务器无法在不遇到异常的情况下解析定义文件,您将收到“DAG x is missing”。

如果以上内容适合您,您可以通过调整 Cloud Storage 存储桶 ACL 或启用 DAG serialization 来解决此问题。序列化消除了 Web 服务器对 parse/execute 定义文件的需要,并将其全部留给调度程序,因此它也可以解决您的问题。