ZenML Dag 没有出现在 Airflow UI

ZenML Dag didn't show up in Airflow UI

我正在试用 ZenML,它说它可以将我的 .py 管道转换为气流 DAG。 我按照这里的每一步操作:https://docs.zenml.io/guides/low-level-api/chapter-7,都成功了

我的管道 运行 在本地很好,但是为什么看不到在 airflow UI 上创建的这个 DAG? UI 完全是空的....

问题似乎是,ZenML 将复制我以 ZenML 方式编写的 .py 管道,并期望它可以 运行 在气流中...在我的情况下,这是行不通的。有谁知道如何让 ZenML 运行 我的代码成功通过气流?

这是我的 ZenML .py 代码:

import pandas as pd
import numpy as np
import os

import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score

from zenml.pipelines import pipeline
from zenml.steps import step
from zenml.steps.step_output import Output
from zenml.steps.base_step_config import BaseStepConfig

class pipeline_config(BaseStepConfig):
    """
    Params used in the pipeline
    """
    label: str = 'species'

@step
def split_data(config: pipeline_config) -> Output(
    X=pd.DataFrame, y=pd.DataFrame
):
    path_to_csv = os.path.join('~/airflow/data', 'leaf.csv')
    df = pd.read_csv(path_to_csv)
    label = config.label

    y = df[[label]]
    X = df.drop(label, axis=1)

    return X, y


@step
def train_evaltor(
    config: pipeline_config,
    X: pd.DataFrame,
    y: pd.DataFrame
) -> float:
    y = y[config.label]

    folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
    lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
    metrics_lst = []

    for train_idx, val_idx in folds.split(X, y):
        X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]

        lgbm.fit(X_train, y_train)
        y_pred = lgbm.predict(X_val)

        cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
        metrics_lst.append(cv_balanced_accuracy)

    avg_performance = np.mean(metrics_lst)
    print(f"Avg Performance: {avg_performance}")

    return avg_performance


@pipeline
def super_mini_pipeline(
    data_spliter,
    train_evaltor
):
    X, y = data_spliter()
    train_evaltor(X=X, y=y)


# run the pipeline
pipeline = super_mini_pipeline(data_spliter=split_data(),
                                train_evaltor=train_evaltor())
pipeline.run()

好的,成功了!见下图:

原因是 airflowdag 如果 safe_mode 打开(默认情况下),则必须在气流中。这是气流特定的逻辑可以在气流代码库中。

所以我所做的就是更改最后几行:

# run the pipeline airflow
pipeline = super_mini_pipeline(data_spliter=split_data(),
                                train_evaltor=train_evaltor())
DAG = pipeline.run()

您还可以更改 airflow.cfg 文件并关闭安全模式:

$HOME/.config/zenml/airflow_root/<UUID>/airflow.cfg

# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = False

编辑: 可能还有另一个原因:Airflow DAG 发现还依赖于作为 globals() 的 DAG,因此我们可能需要使用 DAG = pipeline.run() 来捕获它。所以无论如何,它都有效!