动态创建任务列表

Dynamically create list of tasks

我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。未在下面的示例中显示,但重要的是要注意列表中的某些项目取决于其他任务,因此我使用 set_upstream 来强制执行依赖关系。

- airflow_home
  \- dags
    \- workflow.py

workflow.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )

问题是 workflow.py 一遍又一遍地变得 运行(每次任务 运行s?)我的 get_task_list() 方法受到 AWS 和抛出异常。

我认为这是因为每当 run_task() 被调用时,它 运行 将 workflow.py 中的所有全局变量设置为 workflow.py 所以我尝试将 run_task() 移动到一个单独的模块中像这样:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py

但这并没有改变任何东西。我什至尝试将 get_task_list() 放入用工厂函数包装的 SubDagOperator 中,它的行为方式仍然相同。

我的问题与这些问题有关吗?

此外,为什么 workflow.py 如此频繁地得到 运行,为什么 get_task_list() 抛出的错误会导致单个任务在任务方法未引用时失败 workflow.py 并且不依赖于它?

最重要的是,并行处理列表并强制执行列表中项目之间的任何依赖关系的最佳方法是什么?

根据您提到的问题,当 dag 运行ning 时,airflow 不支持创建任务。

因此,气流会在开始 运行 之前定期生成完整的 DAG 定义。理想情况下,此类生成的周期应与该 DAG 的调度间隔相同。

BUT 可能是每次airflow检查dag变化的时候,也在生成完整的dag,导致请求过多。该时间使用 airflow.cfg.

中的配置 min_file_process_interval 和 dag_dir_list_interval 进行控制

关于任务失败,它们失败是因为 dag 创建本身失败并且 airflow 无法启动它们。