如何将 pandas 数据帧传递给气流任务

How to pass pandas dataframe to airflow tasks

我正在学习如何使用气流构建机器学习管道。

但是没有找到方法将一个任务生成的pandas数据帧传递给另一个任务...似乎需要将数据转换为JSON格式或将数据保存在每个任务中的数据库?

最后,我不得不将所有内容都放在 1 个任务中...是否有在 airflow 任务之间传递数据帧的方法?

这是我的代码:

from datetime import datetime
import pandas as pd
import numpy as np
import os

import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score

from airflow.decorators import dag, task
from airflow.operators.python_operator import PythonOperator


@dag(dag_id='super_mini_pipeline', schedule_interval=None, 
 start_date=datetime(2021, 11, 5), catchup=False, tags=['ml_pipeline'])
def baseline_pipeline():

    def all_in_one(label):
        path_to_csv = os.path.join('~/airflow/data','leaf.csv') 
        df = pd.read_csv(path_to_csv)

        y = df[label]
        X = df.drop(label, axis=1)

        folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
        lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
        metrics_lst = []

        for train_idx, val_idx in folds.split(X, y):
            X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
            X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
        
            lgbm.fit(X_train, y_train)
            y_pred = lgbm.predict(X_val)
        
            cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
            metrics_lst.append(cv_balanced_accuracy)
    
        avg_performance = np.mean(metrics_lst)

        print(f"Avg Performance: {avg_performance}")


    all_in_one_task = PythonOperator(task_id='all_in_one_task', python_callable=all_in_one, op_kwargs={'label':'species'})
    all_in_one_task 


# dag invocation
pipeline_dag = baseline_pipeline()

虽然它被用于许多 ETL 任务,但 Airflow 并不是此类操作的正确选择,它适用于工作流而非数据流。但是有很多方法可以在不在任务之间传递整个数据帧的情况下做到这一点。

您可以使用 xcom.push 和 xcom.pull 传递有关数据的信息:

一个。将第一个任务的结果保存在某处(json、csv 等)

b。传递给 xcom.push 有关已保存文件的信息。例如。文件名、路径.

c。使用 xcom.pull 从其他任务读取此文件名并执行所需的操作。

或者:

上面的一切都使用了一些数据库tables:

一个。在 task_1 中,您可以从 table_1 中的某个数据框中下载数据,对其进行处理并保存在另一个 table_2 中(df.to_sql())。

b。使用 xcom.push.

传递 table 的名称

c。从另一个任务中使用 xcom.pull 获取 table_2 并使用 df.read_sql().

读取它

有关如何使用 xcom 的信息,您可以从气流示例中获得。 示例:https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial_etl_dag.py

恕我直言,还有很多其他更好的方法,我只是写了我尝试过的。

完全同意@Talgat 的观点,Airflow 并不是为此而构建的。它侧重于任务依赖性而不是数据依赖性。

也许您可以查看像 ZenML to solve this problem? It has a guide 这样的以数据为中心的流水线解决方案,其中包含跨流水线步骤传递 Pandas 数据帧的示例。您还可以利用跨步骤的数据缓存和其他功能,使其更适合您正在做的事情。

最重要的是,ZenML 管道也是 deploy-able as an Airflow DAG。因此,与其专注于自己编写工件逻辑的持久化,不如让 ZenML 来处理它。

免责声明:我是 ZenML 的核心贡献者之一,所以这无疑是有偏见的。仍然认为它可能对 OP 有帮助!