在 Airflow 中传递 Python 可调用列表

Passing a list on a Python Callable in Airflow

我有一个 dag,它使用 CSV 文件列表,然后将它们创建到数据框中以供导入。

#CREATING CSV FILES
def csv_filess():
    print("Creating CSV files....")
    csv_files = []
    for file in os.listdir(dataset_dir):
        if file.endswith('.csv'):
            csv_files.append(file)
    print("***Step 3: CSV files created***")
    return csv_files

#CREATING DATAFRAME
def create_df(csv_files):      
    print("Creating dataframe....")     
    df = {}
    for file in csv_files:
        try:
            df[file] = pd.read_csv(data_path+file)
        except UnicodeDecodeError:
            df[file] = pd.read_csv(dataset_dir+file, encoding="ISO-8859-1")
    
    print("***Step 4: CSV files created in df!***")
    return df


t3 = PythonOperator(
    task_id='create_csv',
    python_callable=csv_filess, provide_context=True,
    dag=dag)

t4 = PythonOperator(
    task_id='create_df',
    python_callable=create_df,
    op_args = t3.output,
    provide_context=True,
    dag=dag)

但是我得到一个错误:

create_df() takes 1 positional argument but 4 were given

我认为是因为我必须先这样说?:

csv_files = csv_filess()

但是如何在 Airflow 任务上定义它?

从 PythonOperator 返回值会自动将输出存储为键为“return_value”的 XCom。因此,您将从任务 create_csv 中获得一个带有键 return_value 和值 ["file1.csv", "file2.csv", ...] 的 XCom。您可以在 Admin -> XComs 下检查 Airflow 中的所有 XComs,或者通过单击任务 -> Instance Details -> XCom 来检查每个任务。

在您的 create_df 任务中,您随后使用 t3.output 传递 create_csv 的输出。这是对之前创建的 XCom 的引用。当给 op_args 一个列表时,Airflow 会自动解压该列表。所以你必须接受带有 * 的多个参数才能做到这一点:

def create_df(*csv_files):
    ...

两个注意事项:

您可能有兴趣探索 Airflow's TaskFlow API,这将减少样板代码。您的代码将如下所示:

from airflow.decorators import task

with DAG(...) as dag:

    @task
    def csv_filess():
        ...

    @task
    def create_df(csv_files):
        ...

    create_df(csv_filess())

(注意这里create_df不需要解包。

最后请注意,PythonOperators 的返回值会自动存储为 XCom(默认情况下存储在 Airflow Metastore 中)。如果配置了 intended/custom XCom 后端,那很好,但是在返回 Pandas DataFrames 时我有点担心,因为这些数据帧可能非常大。