气流任务取决于另一个任务结果?

airflow task depend on another task result?

我有2个任务

第一个任务下载一些数据集到folder_1

第二个任务清理 folder_1

上的每个文件

CRUDE_NEW_DATASET_LOCAL是本地路径

download_crude_new_dataset = BashOperator(
        task_id = "download_crude_new_dataset",
        bash_command = bash.download_crude_new_dataset(),
        dag=dag
)

cleaning_crude_new_dataset = []
crude_new_dataset = glob(bash.CRUDE_NEW_DATASET_LOCAL+"/*",recursive=True)
for p in crude_new_dataset :
    path = p.replace('\','/')
    if os.path.isfile(path):
        cleaning_crude_new_dataset.append(
            BashOperator(
                task_id = "cleaning_crude_new_dataset-"+bash._path_leaf_(path),
                bash_command = bash.cleaning_dataset(path),
                dag=dag
            )
    ) 

download_crude_new_dataset >> cleaning_crude_new_dataset

我触发气流 dag 时的问题,folder_1 仍然是空的。并且使 cleaning_crude_new_dataset (任务数组)为空。

感谢帮助

这样试试:

download_crude_new_dataset = BashOperator(
        task_id= "download_crude_new_dataset",
        bash_command= bash.download_crude_new_dataset(),
        dag= dag
)

#cleaning_crude_new_dataset = []

crude_new_dataset = glob( bash.CRUDE_NEW_DATASET_LOCAL + "/*", recursive= True )
for p in crude_new_dataset :
    path = p.replace( '\', '/' )
    if os.path.isfile( path ):
        temp_task = BashOperator(
            task_id= "cleaning_crude_new_dataset-" + bash._path_leaf_( path ),
            bash_command= bash.cleaning_dataset( path ),
            dag= dag
        )
        #cleaning_crude_new_dataset.append( temp_task )
        download_crude_new_dataset.set_downstream( temp_task )

这个问题已经解决了

将单个 Dag 中的任务拆分为多个 Dag 并触发其他 dags (dag1 >> dag2 >> dag3 ... )

如果需要,将 dag_dir_list_interval 从 300 更改为较小的数字