气流任务取决于另一个任务结果?
airflow task depend on another task result?
我有2个任务
第一个任务下载一些数据集到folder_1
第二个任务清理 folder_1
上的每个文件
CRUDE_NEW_DATASET_LOCAL
是本地路径
download_crude_new_dataset = BashOperator(
task_id = "download_crude_new_dataset",
bash_command = bash.download_crude_new_dataset(),
dag=dag
)
cleaning_crude_new_dataset = []
crude_new_dataset = glob(bash.CRUDE_NEW_DATASET_LOCAL+"/*",recursive=True)
for p in crude_new_dataset :
path = p.replace('\','/')
if os.path.isfile(path):
cleaning_crude_new_dataset.append(
BashOperator(
task_id = "cleaning_crude_new_dataset-"+bash._path_leaf_(path),
bash_command = bash.cleaning_dataset(path),
dag=dag
)
)
download_crude_new_dataset >> cleaning_crude_new_dataset
我触发气流 dag 时的问题,folder_1
仍然是空的。并且使 cleaning_crude_new_dataset
(任务数组)为空。
感谢帮助
这样试试:
download_crude_new_dataset = BashOperator(
task_id= "download_crude_new_dataset",
bash_command= bash.download_crude_new_dataset(),
dag= dag
)
#cleaning_crude_new_dataset = []
crude_new_dataset = glob( bash.CRUDE_NEW_DATASET_LOCAL + "/*", recursive= True )
for p in crude_new_dataset :
path = p.replace( '\', '/' )
if os.path.isfile( path ):
temp_task = BashOperator(
task_id= "cleaning_crude_new_dataset-" + bash._path_leaf_( path ),
bash_command= bash.cleaning_dataset( path ),
dag= dag
)
#cleaning_crude_new_dataset.append( temp_task )
download_crude_new_dataset.set_downstream( temp_task )
这个问题已经解决了
将单个 Dag 中的任务拆分为多个 Dag
并触发其他 dags (dag1
>> dag2
>> dag3
... )
如果需要,将 dag_dir_list_interval
从 300 更改为较小的数字
我有2个任务
第一个任务下载一些数据集到folder_1
第二个任务清理 folder_1
CRUDE_NEW_DATASET_LOCAL
是本地路径
download_crude_new_dataset = BashOperator(
task_id = "download_crude_new_dataset",
bash_command = bash.download_crude_new_dataset(),
dag=dag
)
cleaning_crude_new_dataset = []
crude_new_dataset = glob(bash.CRUDE_NEW_DATASET_LOCAL+"/*",recursive=True)
for p in crude_new_dataset :
path = p.replace('\','/')
if os.path.isfile(path):
cleaning_crude_new_dataset.append(
BashOperator(
task_id = "cleaning_crude_new_dataset-"+bash._path_leaf_(path),
bash_command = bash.cleaning_dataset(path),
dag=dag
)
)
download_crude_new_dataset >> cleaning_crude_new_dataset
我触发气流 dag 时的问题,folder_1
仍然是空的。并且使 cleaning_crude_new_dataset
(任务数组)为空。
感谢帮助
这样试试:
download_crude_new_dataset = BashOperator(
task_id= "download_crude_new_dataset",
bash_command= bash.download_crude_new_dataset(),
dag= dag
)
#cleaning_crude_new_dataset = []
crude_new_dataset = glob( bash.CRUDE_NEW_DATASET_LOCAL + "/*", recursive= True )
for p in crude_new_dataset :
path = p.replace( '\', '/' )
if os.path.isfile( path ):
temp_task = BashOperator(
task_id= "cleaning_crude_new_dataset-" + bash._path_leaf_( path ),
bash_command= bash.cleaning_dataset( path ),
dag= dag
)
#cleaning_crude_new_dataset.append( temp_task )
download_crude_new_dataset.set_downstream( temp_task )
这个问题已经解决了
将单个 Dag 中的任务拆分为多个 Dag
并触发其他 dags (dag1
>> dag2
>> dag3
... )
如果需要,将 dag_dir_list_interval
从 300 更改为较小的数字