DAG 运行成功,但在 Airflow Webserver UI DAG 不是 available/DAG 在 Google Cloud Composer 中不可点击

DAG runs successfully but in Airflow Webserver UI DAG isn't available/DAG isn't clickable in Google Cloud Composer

下面是airflow DAG代码。当 airflow 在本地托管时和在云作曲家上托管时,它都可以完美运行。但是,DAG 本身在 Composer UI 中不可点击。 我发现了一个类似的问题并尝试了 中链接的已接受答案。我的问题是相似的。

import airflow
from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator
from airflow.contrib.operators.dataproc_operator import DataProcSparkOperator

from datetime import datetime, timedelta
import sys

#copy this package to dag directory in GCP composer bucket
from schemas.schemaValidator import loadSchema
from schemas.schemaValidator import sparkArgListToMap

#change these paths to point to GCP Composer data directory

## cluster config
clusterConfig= loadSchema("somePath/jobConfig/cluster.yaml","cluster")

##per job yaml config
autoLoanCsvToParquetConfig= loadSchema("somePath/jobConfig/job.yaml","job")

default_args= {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2019, 1, 1),
        'retries': 1,
        'retry_delay': timedelta(minutes=3)
}

dag= DAG('usr_job', default_args=default_args, schedule_interval=None)

t1= DummyOperator(task_id= "start", dag=dag)

t2= DataprocClusterCreateOperator(
    task_id= "CreateCluster",
    cluster_name= clusterConfig["cluster"]["cluster_name"],
    project_id= clusterConfig["project_id"],
    num_workers= clusterConfig["cluster"]["worker_config"]["num_instances"],
    image_version= clusterConfig["cluster"]["dataproc_img"],
    master_machine_type= clusterConfig["cluster"]["worker_config"]["machine_type"],
    worker_machine_type= clusterConfig["cluster"]["worker_config"]["machine_type"],
    zone= clusterConfig["region"],
    dag=dag
)

t3= DataProcSparkOperator(
    task_id= "csvToParquet",
    main_class= autoLoanCsvToParquetConfig["job"]["main_class"],
    arguments= autoLoanCsvToParquetConfig["job"]["args"],
    cluster_name= clusterConfig["cluster"]["cluster_name"],
    dataproc_spark_jars= autoLoanCsvToParquetConfig["job"]["jarPath"],
    dataproc_spark_properties= sparkArgListToMap(autoLoanCsvToParquetConfig["spark_params"]),
    dag=dag
)

t4= DataprocClusterDeleteOperator(
    task_id= "deleteCluster",
    cluster_name= clusterConfig["cluster"]["cluster_name"],
    project_id= clusterConfig["project_id"],
    dag= dag
)

t5= DummyOperator(task_id= "stop", dag=dag)

t1>>t2>>t3>>t4>>t5

UI 给出了这个错误 - "This DAG isn't available in the webserver DAG bag object. It shows up in this list because the scheduler marked it as active in the metadata database."

然而,当我在 Composer 上手动触发 DAG 时,我通过日志文件 运行 成功找到了它。

问题出在为获取配置文件而提供的 path 上。我正在为 GCS 中的 data 文件夹提供路径。根据 Google 文档,只有 dags 文件夹同步到所有节点,而不是 data 文件夹。

不用说,这是dag解析时遇到的问题,因此在UI上没有正确显示。更有趣的是,这些调试消息并未暴露给 Composer 1.5 及更早版本。现在,最终用户可以使用它们来帮助调试。无论如何感谢所有帮助过的人。