运行 气流中特定时间的任务

Running tasks in a specific time in airflow

我在 airflow dag 中有 3 个任务。

这三个任务有时间依赖性

任务 1 - 早上 8 点

任务 - 凌晨 2 - 10 点

任务 -3 - 凌晨 12 点

我没有找到任何相关文档。它告诉我只设置上游或下游作业。有人可以帮忙吗

我正在使用 Google CLoud Composer

好吧,Airflow 结构是为了 schedule_interval 设置在 DAG 级别。这意味着您可以设置整个 DAG 开始执行的时间,但您不能真正为每个 task 指定不同的执行时间。

如果您有三个相互不依赖的独立任务,解决方案是创建三个不同的 DAG,并在这三个不同的时间安排它们。


如果相反 task_2task_3 的时间依赖性不是那么重要,但你只关心执行 一个接一个 你可以,确实,设置任务之间的依赖关系,以便 task_2 运行s 总是在 task_1 完成之后,task_3 运行s 总是在 task_2 完成之后完成的。要设置依赖关系,您可以使用非常方便的语法(假设您的任务已分配给变量 task_1task_2task_3):

task_1 >> task_2 >> task_3

您可以参考Airflow official documentation了解更多信息。


TL;DR: 您不能将单个任务安排在不同的特定时间 运行,因为您唯一可以设置的时间是整个 DAG 运行 一个。

Airflow Documentation:

https://airflow.apache.org/docs/stable/concepts.html#bitshift-composition

在 Airflow 1.8 之后你还可以使用 bitshift 组合。

不要为你的任务设定时间,而是使用这种方法:

op1 = DummyTask(...)
op2 = DummyTask(...)

op1 >> op2 # same as: op1.set_downstream(op2)

上面的赋值意味着Airflow只会在op1成功完成后才执行op2

您可以使用时间传感器。添加一个时间传感器作为sub-task,不断轮询时间点是否已经过去。虽然不那么优雅,但它确实有效。

我们通常会写一个这样的帮助函数:

def wait_till(hour: int, minute: int, second: int, dag):
    """get a DateTimeSensor runs till hour: minute: second for default timezone

    Parameters
    ----------
    hour : int
        hour in the day
    minute : int
        minute
    second : int
        second
    dag : [type]
        dag
    """
    target_time_str = f'next_execution_date.in_tz("Asia/Shanghai").replace(hour={hour}, minute={minute}, second={second})'
    task_id_str = f"wait_till_{hour:02d}{minute:02d}{second:02d}"
    return DateTimeSensor(
        task_id=task_id_str, target_time="{{ " + target_time_str + " }}", dag=dag, poke_interval=5
    )

然后将其用作像

这样的 dag 中的计时器任务
wait_till(8,0,0) >> your_task

我们还尝试将单个任务放在单独的 dag 中,然后使用外部任务传感器对任务依赖性进行建模。这种方法给我们带来的问题是我们经常需要重新安排任务。重新安排 airflow 中的 dag 会丢失所有日志,这对我们来说是不能接受的。