运行时的 Airflow 动态任务

Airflow dynamic tasks at runtime

关于 'dynamic tasks' 的其他问题似乎涉及在计划或设计时动态构建 DAG。我对在执行期间向 DAG 动态添加任务很感兴趣。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)

这种天真的实现似乎不起作用 - 虚拟任务从未出现在 UI 中。

在执行期间向 DAG 添加新运算符的正确方法是什么?可能吗?

关于您的代码示例,您从未调用在 DAG 中注册您的任务的函数。

要有一种动态任务,您可以有一个运算符根据某些状态执行不同的操作,或者您可以有一些可以根据状态跳过的运算符,使用 ShortCircuitOperator。

无法在执行过程中修改 DAG(无需更多工作)。

dag = DAG(... 由调度程序在循环中选取。它将包含任务实例 'python_operator'。该任务实例被安排在 dag 运行 中,并由 worker 或 executor 执行。由于 Airflow 数据库中的 DAG 模型仅由调度程序更新,因此这些添加的虚拟任务不会持久保存到 DAG 中,也不会调度到 运行。当工人离开时,他们将被遗忘。除非你从调度程序中复制所有关于持久化和更新模型的代码……但是下次调度程序访问 DAG 文件进行解析时,这将被撤消,这可能每分钟发生一次,每秒一次或更快,具体取决于其他多少有待解析的 DAG 文件。

Airflow 实际上希望每个 DAG 在 运行 秒之间大致保持相同的布局。它还想 reload/parse DAG 文件不断。因此,尽管您可以制作一个 DAG 文件,在每个 运行 上根据一些外部数据动态确定任务(最好缓存在文件或 pyc 模块中,而不是像数据库查找那样的网络 I/O,您将放慢 all DAG 的整个调度循环)这不是一个好计划,因为您的图形和树视图会变得混乱,并且您的调度程序解析将因该查找而更加繁重。

您可以使每个任务都可调用 运行…

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,

但这是顺序的,你必须弄清楚如何使用 python 使它们并行(使用 futures?),如果有任何异常引发整个任务失败。它还绑定到一个执行者或工作人员,因此不使用气流的任务分配(kubernetes、mesos、celery)。

另一种处理方法是添加固定数量的任务(最大数量),并使用可调用对象将不需要的任务短路或使用 xcom 为每个任务推送参数,改变他们在 运行 时间的行为,但没有改变 DAG。

我感谢大家在这里所做的所有工作,因为我面临着创建动态结构化 DAG 的同样挑战。我犯了足够多的错误,以至于没有使用违背其设计的软件。如果我不能在 UI 上检查整个 运行 并放大和缩小,基本上使用气流功能,这是我无论如何使用它的主要原因。我可以在一个函数中编写多处理代码并完成它。

综上所述,我的解决方案是使用诸如 redis 锁定之类的资源管理器,并使用 DAG 向该资源管理器写入有关 运行 如何 运行 等的数据;并让另一个 DAG 或 运行 以特定时间间隔轮询资源管理器,在 运行 之前锁定它们并在完成时删除它们。这样至少我按预期使用气流,即使它的规格不完全符合我的需求。我将问题分解为更多可定义的块。这些解决方案是有创意的,但它们违背了设计,并且没有经过开发人员的测试。具体来说是要有固定的结构化工作流程。除非我重写核心气流代码并测试自己,否则我不能围绕未经测试和违反设计的代码进行工作。我知道我的解决方案会带来锁定等方面的复杂性,但至少我知道它的界限。