ZenML Dag 没有出现在 Airflow UI
ZenML Dag didn't show up in Airflow UI
我正在试用 ZenML,它说它可以将我的 .py 管道转换为气流 DAG。
我按照这里的每一步操作:https://docs.zenml.io/guides/low-level-api/chapter-7,都成功了
我的管道 运行 在本地很好,但是为什么看不到在 airflow UI 上创建的这个 DAG? UI 完全是空的....
问题似乎是,ZenML 将复制我以 ZenML 方式编写的 .py 管道,并期望它可以 运行 在气流中...在我的情况下,这是行不通的。有谁知道如何让 ZenML 运行 我的代码成功通过气流?
这是我的 ZenML .py 代码:
import pandas as pd
import numpy as np
import os
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score
from zenml.pipelines import pipeline
from zenml.steps import step
from zenml.steps.step_output import Output
from zenml.steps.base_step_config import BaseStepConfig
class pipeline_config(BaseStepConfig):
"""
Params used in the pipeline
"""
label: str = 'species'
@step
def split_data(config: pipeline_config) -> Output(
X=pd.DataFrame, y=pd.DataFrame
):
path_to_csv = os.path.join('~/airflow/data', 'leaf.csv')
df = pd.read_csv(path_to_csv)
label = config.label
y = df[[label]]
X = df.drop(label, axis=1)
return X, y
@step
def train_evaltor(
config: pipeline_config,
X: pd.DataFrame,
y: pd.DataFrame
) -> float:
y = y[config.label]
folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
metrics_lst = []
for train_idx, val_idx in folds.split(X, y):
X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
lgbm.fit(X_train, y_train)
y_pred = lgbm.predict(X_val)
cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
metrics_lst.append(cv_balanced_accuracy)
avg_performance = np.mean(metrics_lst)
print(f"Avg Performance: {avg_performance}")
return avg_performance
@pipeline
def super_mini_pipeline(
data_spliter,
train_evaltor
):
X, y = data_spliter()
train_evaltor(X=X, y=y)
# run the pipeline
pipeline = super_mini_pipeline(data_spliter=split_data(),
train_evaltor=train_evaltor())
pipeline.run()
好的,成功了!见下图:
原因是 airflow
和 dag
如果 safe_mode
打开(默认情况下),则必须在气流中。这是气流特定的逻辑可以在气流代码库中。
所以我所做的就是更改最后几行:
# run the pipeline airflow
pipeline = super_mini_pipeline(data_spliter=split_data(),
train_evaltor=train_evaltor())
DAG = pipeline.run()
您还可以更改 airflow.cfg
文件并关闭安全模式:
在$HOME/.config/zenml/airflow_root/<UUID>/airflow.cfg
# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = False
编辑:
可能还有另一个原因:Airflow DAG 发现还依赖于作为 globals() 的 DAG,因此我们可能需要使用 DAG = pipeline.run()
来捕获它。所以无论如何,它都有效!
我正在试用 ZenML,它说它可以将我的 .py 管道转换为气流 DAG。 我按照这里的每一步操作:https://docs.zenml.io/guides/low-level-api/chapter-7,都成功了
我的管道 运行 在本地很好,但是为什么看不到在 airflow UI 上创建的这个 DAG? UI 完全是空的....
问题似乎是,ZenML 将复制我以 ZenML 方式编写的 .py 管道,并期望它可以 运行 在气流中...在我的情况下,这是行不通的。有谁知道如何让 ZenML 运行 我的代码成功通过气流?
这是我的 ZenML .py 代码:
import pandas as pd
import numpy as np
import os
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score
from zenml.pipelines import pipeline
from zenml.steps import step
from zenml.steps.step_output import Output
from zenml.steps.base_step_config import BaseStepConfig
class pipeline_config(BaseStepConfig):
"""
Params used in the pipeline
"""
label: str = 'species'
@step
def split_data(config: pipeline_config) -> Output(
X=pd.DataFrame, y=pd.DataFrame
):
path_to_csv = os.path.join('~/airflow/data', 'leaf.csv')
df = pd.read_csv(path_to_csv)
label = config.label
y = df[[label]]
X = df.drop(label, axis=1)
return X, y
@step
def train_evaltor(
config: pipeline_config,
X: pd.DataFrame,
y: pd.DataFrame
) -> float:
y = y[config.label]
folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
metrics_lst = []
for train_idx, val_idx in folds.split(X, y):
X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
lgbm.fit(X_train, y_train)
y_pred = lgbm.predict(X_val)
cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
metrics_lst.append(cv_balanced_accuracy)
avg_performance = np.mean(metrics_lst)
print(f"Avg Performance: {avg_performance}")
return avg_performance
@pipeline
def super_mini_pipeline(
data_spliter,
train_evaltor
):
X, y = data_spliter()
train_evaltor(X=X, y=y)
# run the pipeline
pipeline = super_mini_pipeline(data_spliter=split_data(),
train_evaltor=train_evaltor())
pipeline.run()
好的,成功了!见下图:
原因是 airflow
和 dag
如果 safe_mode
打开(默认情况下),则必须在气流中。这是气流特定的逻辑可以在气流代码库中。
所以我所做的就是更改最后几行:
# run the pipeline airflow
pipeline = super_mini_pipeline(data_spliter=split_data(),
train_evaltor=train_evaltor())
DAG = pipeline.run()
您还可以更改 airflow.cfg
文件并关闭安全模式:
在$HOME/.config/zenml/airflow_root/<UUID>/airflow.cfg
# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = False
编辑:
可能还有另一个原因:Airflow DAG 发现还依赖于作为 globals() 的 DAG,因此我们可能需要使用 DAG = pipeline.run()
来捕获它。所以无论如何,它都有效!