Airflow 任务能否在运行时动态生成 DAG?

Can an Airflow task dynamically generate a DAG at runtime?

我有一个不规则上传的上传文件夹。对于每个上传的文件,我想生成一个特定于该文件的 DAG。

我的第一个想法是使用 FileSensor 来执行此操作,该文件传感器监视上传文件夹,并以新文件的存在为条件,触发创建单独 DAG 的任务。概念上:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)

在我最初的实现中,CreateDAGTask 是一个 PythonOperator,它创建了 DAG 全局变量,方法是将它们放置在全局命名空间 (see this SO answer) 中,如下所示:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from pathlib import Path

UPLOAD_LOCATION = "/opt/files/uploaded"

# Dynamic DAG generation task code, for the Sensor_DAG below
def generate_dags_for_files(location=UPLOAD_LOCATION, **kwargs):
    dags = []
    for filepath in Path(location).glob('*'):
        dag_name = f"process_{filepath.name}"
        dag = DAG(dag_name, schedule_interval="@once", default_args={
            "depends_on_past": True,
            "start_date": datetime(2020, 7, 15),
            "retries": 1,
            "retry_delay": timedelta(hours=12)
        }, catchup=False)
        dag_task = DummyOperator(dag=dag, task_id=f"start_{dag_name}")

        dags.append(dag)

        # Try to place the DAG into globals(), which doesn't work
        globals()[dag_name] = dag

    return dags

主 DAG 然后通过 PythonOperator:

调用此逻辑
# File-sensing DAG
default_args = {
    "depends_on_past" : False,
    "start_date"      : datetime(2020, 7, 16),
    "retries"         : 1,
    "retry_delay"     : timedelta(hours=5),
}
with DAG("Sensor_DAG", default_args=default_args,
         schedule_interval= "50 * * * *", catchup=False, ) as sensor_dag:

    start_task  = DummyOperator(task_id="start")
    stop_task   = DummyOperator(task_id="stop")
    sensor_task = FileSensor(task_id="my_file_sensor_task",
                             poke_interval=60,
                             filepath=UPLOAD_LOCATION)
    process_creator_task = PythonOperator(
        task_id="process_creator",
        python_callable=generate_dags_for_files,
    )
    start_task >> sensor_task >> process_creator_task >> stop_task

但这不起作用,因为在 process_creator_task 运行时,Airflow 已经解析了全局变量。解析时间后的新全局变量无关紧要。

临时解决方案

Airflow dynamic DAG and task Ids, I can achieve what I'm trying to do by omitting the FileSensor task altogether and just letting Airflow generate the per-file task at each scheduler heartbeat, replacing the Sensor_DAG with just executing generate_dags_for_files: Update: Nevermind -- while this does create a DAG in the dashboard, actual execution runs into the "DAG seems to be missing" 个问题:

generate_dags_for_files()

这确实意味着我无法再使用 FileSensorpoke_interval 参数来调节文件夹轮询的频率;相反,Airflow 会在每次收集 DAG 时轮询该文件夹。

这是最好的模式吗?

其他相关的 Whosebug 话题

简而言之:如果任务写入 DagBag 读取的位置,是的,但最好避免需要这样做的模式。 您在任务中想要 custom-create 的任何 DAG 都应该是静态的、大量参数化的 conditionally-triggered DAG。 ,而我感谢他对这个问题的评论的指导。

也就是说,这里有一些方法可以完成问题的要求,无论这个想法有多糟糕,ham-handedness:

  • 如果您从变量 (like so) 动态生成 DAG,请修改变量。
  • 如果您从配置文件列表动态生成 DAG,请将新的配置文件添加到您从中提取配置文件的位置,以便在下一个 DAG 集合中生成新的 DAG。
  • 使用 Jinja 模板之类的东西在 dags/ 文件夹中写入一个新的 Python 文件。

要在任务运行后保留对任务的访问权限,您必须保持新的 DAG 定义稳定并在未来的仪表板更新/DagBag 集合中可访问。否则,