如何在 Airflow 中动态创建子标签

How to dynamically create subdags in Airflow

我有一个主要的 dag,它检索一个文件并将该文件中的数据拆分为单独的 csv 文件。 对于这些 csv 文件的每个文件,我还有另一组任务必须完成。例如(上传到 GCS,插入到 BigQuery) 如何根据文件数量为每个文件动态生成一个 SubDag? SubDag 将定义上传到 GCS、插入到 BigQuery、删除 csv 文件等任务)

所以现在,这就是它的样子

main_dag = DAG(....)
download_operator = SFTPOperator(dag = main_dag, ...)  # downloads file
transform_operator = PythonOperator(dag = main_dag, ...) # Splits data and writes csv files

def subdag_factory(): # Will return a subdag with tasks for uploading to GCS, inserting to BigQuery.
    ...
    ...

如何为 transform_operator 中生成的每个文件调用 subdag_factory?

Airflow 以两种不同的方式处理 DAG。

  1. 一种方法是在一个 python 文件中定义动态 DAG,然后将其放入 dags_folder。它基于外部源(其他目录中的配置文件,SQL、noSQL 等)生成动态 DAG。对 DAG 结构的更改较少 - 更好(实际上适用于所有情况)。例如,我们的 DAG 文件为每个记录(或文件)生成 dags,它也会生成 dag_id。每个airflow scheduler的heartbeat这段代码遍历列表并生成相应的DAG。优点 :) 不是太多,只需更改一个代码文件。缺点很多,这与 Airflow 的工作方式有关。对于每个新的 DAG(dag_id),airflow 都会将步骤写入数据库,因此当步骤数或步骤名称更改时,它可能会破坏 Web 服务器。当您从列表中删除 DAG 时,它就变成了孤儿院,您无法从 Web 界面访问它并且无法控制 DAG,您看不到步骤,无法重新启动等等。如果您有 DAG 的静态列表,并且 ID 不会更改,但偶尔会执行此方法,则此方法是可以接受的。

  2. 所以在某些时候我想出了另一个解决方案。您有静态 DAG(它们仍然是动态的,脚本生成它们,但它们的结构和 ID 不会改变)。因此,不是像在目录中那样遍历列表并生成 DAG 的脚本。你做了两个静态 DAG,一个定期监视目录 (*/10 ****),另一个由第一个触发。因此,当出现新的 file/files 时,第一个 DAG 会触发第二个带有 arg conf 的 DAG。必须为目录中的每个文件执行下一个代码。

         session = settings.Session()
         dr = DagRun(
                     dag_id=dag_to_be_triggered,
                     run_id=uuid_run_id,
                     conf={'file_path': path_to_the_file},
                     execution_date=datetime.now(),
                     start_date=datetime.now(),
                     external_trigger=True)
         logging.info("Creating DagRun {}".format(dr))
         session.add(dr)
         session.commit()
         session.close()
     

触发的 DAG 可以接收配置参数并完成特定文件的所有必需任务。要访问 conf 参数,请使用:

    def work_with_the_file(**context):
        path_to_file = context['dag_run'].conf['file_path'] \
            if 'file_path' in context['dag_run'].conf else None

        if not path_to_file:
            raise Exception('path_to_file must be provided')

具有 Airflow 的所有灵活性和功能

缺点是监视器 DAG 可能是垃圾邮件。

我尝试按如下方式动态创建 subdags

# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
    default_args_copy = default_args.copy()

    # dag
    dag = DAG(dag_id=dag_id_child,
              default_args=default_args_copy,
              schedule_interval='@once')

    # operators
    tid_check = 'check2_db_' + db_name
    py_op_check = PythonOperator(task_id=tid_check, dag=dag,
                                 python_callable=check_sync_enabled,
                                 op_args=[db_name])

    tid_spark = 'spark2_submit_' + db_name
    py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
                                 python_callable=spark_submit,
                                 op_args=[db_name])

    py_op_check >> py_op_spark
    return dag

# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
    tid_subdag = 'subdag_' + db_name
    subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
    subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
    # chain subdag-operators together
    airflow.utils.helpers.chain(*subdags)
    return subdags


# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
          default_args=default_args,
          schedule_interval=None)

subdag_ops = create_subdag_operators(dag, db_names)

请注意,为其创建 subdag 的输入列表,此处 db_names,可以在 python 文件中静态声明,也可以从外部源读取。

结果 DAG 看起来像这样

潜入 SubDAG(s)