Airflow启动多个并发通用任务
Airflow start multiple concurrent generic tasks
尝试在 cloud composer 上同时获取几个任务:
arr = {}
for i in xrange(3):
print("i: " + str(i))
command_formatted = command_template.format(str(i))
create_training_instance = bash_operator.BashOperator(
task_id='create_training_instance',
bash_command=command_formatted)
arr[i] = create_training_instance
start_training.set_downstream(arr[i])
出现以下错误:
Broken DAG: [/home/airflow/gcs/dags/scale_simple.py] Dependency
, create_training_instance already
registered
您还需要参数化您的任务 ID,例如,
task_id='create_training_instance' --> 'create_traiing_instance-{}'.format(i)
对于单个任务,task_id
应该始终是唯一的。因此,您可以将 create_training_instance_{}.format(i)
用作 task_id
.
尝试在 cloud composer 上同时获取几个任务:
arr = {}
for i in xrange(3):
print("i: " + str(i))
command_formatted = command_template.format(str(i))
create_training_instance = bash_operator.BashOperator(
task_id='create_training_instance',
bash_command=command_formatted)
arr[i] = create_training_instance
start_training.set_downstream(arr[i])
出现以下错误:
Broken DAG: [/home/airflow/gcs/dags/scale_simple.py] Dependency , create_training_instance already registered
您还需要参数化您的任务 ID,例如, task_id='create_training_instance' --> 'create_traiing_instance-{}'.format(i)
对于单个任务,task_id
应该始终是唯一的。因此,您可以将 create_training_instance_{}.format(i)
用作 task_id
.