Airflow启动多个并发通用任务

Airflow start multiple concurrent generic tasks

尝试在 cloud composer 上同时获取几个任务:

arr = {}
for i in xrange(3):
    print("i: " + str(i))
    command_formatted = command_template.format(str(i))
    create_training_instance = bash_operator.BashOperator(
        task_id='create_training_instance',
        bash_command=command_formatted)
    arr[i] = create_training_instance
    start_training.set_downstream(arr[i])  

出现以下错误:

Broken DAG: [/home/airflow/gcs/dags/scale_simple.py] Dependency , create_training_instance already registered

您还需要参数化您的任务 ID,例如, task_id='create_training_instance' --> 'create_traiing_instance-{}'.format(i)

对于单个任务,task_id 应该始终是唯一的。因此,您可以将 create_training_instance_{}.format(i) 用作 task_id.