运行 气流中特定时间的任务
Running tasks in a specific time in airflow
我在 airflow dag 中有 3 个任务。
这三个任务有时间依赖性
任务 1 - 早上 8 点
任务 - 凌晨 2 - 10 点
任务 -3 - 凌晨 12 点
我没有找到任何相关文档。它告诉我只设置上游或下游作业。有人可以帮忙吗
我正在使用 Google CLoud Composer
好吧,Airflow 结构是为了 schedule_interval
设置在 DAG 级别。这意味着您可以设置整个 DAG 开始执行的时间,但您不能真正为每个 task 指定不同的执行时间。
如果您有三个相互不依赖的独立任务,解决方案是创建三个不同的 DAG,并在这三个不同的时间安排它们。
如果相反 task_2
和 task_3
的时间依赖性不是那么重要,但你只关心执行 一个接一个 你可以,确实,设置任务之间的依赖关系,以便 task_2
运行s 总是在 task_1
完成之后,task_3
运行s 总是在 task_2
完成之后完成的。要设置依赖关系,您可以使用非常方便的语法(假设您的任务已分配给变量 task_1
、task_2
、task_3
):
task_1 >> task_2 >> task_3
您可以参考Airflow official documentation了解更多信息。
TL;DR:
您不能将单个任务安排在不同的特定时间 运行,因为您唯一可以设置的时间是整个 DAG 运行 一个。
Airflow Documentation:
https://airflow.apache.org/docs/stable/concepts.html#bitshift-composition
在 Airflow 1.8 之后你还可以使用 bitshift 组合。
不要为你的任务设定时间,而是使用这种方法:
op1 = DummyTask(...)
op2 = DummyTask(...)
op1 >> op2 # same as: op1.set_downstream(op2)
上面的赋值意味着Airflow只会在op1
成功完成后才执行op2
。
您可以使用时间传感器。添加一个时间传感器作为sub-task,不断轮询时间点是否已经过去。虽然不那么优雅,但它确实有效。
我们通常会写一个这样的帮助函数:
def wait_till(hour: int, minute: int, second: int, dag):
"""get a DateTimeSensor runs till hour: minute: second for default timezone
Parameters
----------
hour : int
hour in the day
minute : int
minute
second : int
second
dag : [type]
dag
"""
target_time_str = f'next_execution_date.in_tz("Asia/Shanghai").replace(hour={hour}, minute={minute}, second={second})'
task_id_str = f"wait_till_{hour:02d}{minute:02d}{second:02d}"
return DateTimeSensor(
task_id=task_id_str, target_time="{{ " + target_time_str + " }}", dag=dag, poke_interval=5
)
然后将其用作像
这样的 dag 中的计时器任务
wait_till(8,0,0) >> your_task
我们还尝试将单个任务放在单独的 dag 中,然后使用外部任务传感器对任务依赖性进行建模。这种方法给我们带来的问题是我们经常需要重新安排任务。重新安排 airflow 中的 dag 会丢失所有日志,这对我们来说是不能接受的。
我在 airflow dag 中有 3 个任务。
这三个任务有时间依赖性
任务 1 - 早上 8 点
任务 - 凌晨 2 - 10 点
任务 -3 - 凌晨 12 点
我没有找到任何相关文档。它告诉我只设置上游或下游作业。有人可以帮忙吗
我正在使用 Google CLoud Composer
好吧,Airflow 结构是为了 schedule_interval
设置在 DAG 级别。这意味着您可以设置整个 DAG 开始执行的时间,但您不能真正为每个 task 指定不同的执行时间。
如果您有三个相互不依赖的独立任务,解决方案是创建三个不同的 DAG,并在这三个不同的时间安排它们。
如果相反 task_2
和 task_3
的时间依赖性不是那么重要,但你只关心执行 一个接一个 你可以,确实,设置任务之间的依赖关系,以便 task_2
运行s 总是在 task_1
完成之后,task_3
运行s 总是在 task_2
完成之后完成的。要设置依赖关系,您可以使用非常方便的语法(假设您的任务已分配给变量 task_1
、task_2
、task_3
):
task_1 >> task_2 >> task_3
您可以参考Airflow official documentation了解更多信息。
TL;DR: 您不能将单个任务安排在不同的特定时间 运行,因为您唯一可以设置的时间是整个 DAG 运行 一个。
Airflow Documentation:
https://airflow.apache.org/docs/stable/concepts.html#bitshift-composition
在 Airflow 1.8 之后你还可以使用 bitshift 组合。
不要为你的任务设定时间,而是使用这种方法:
op1 = DummyTask(...)
op2 = DummyTask(...)
op1 >> op2 # same as: op1.set_downstream(op2)
上面的赋值意味着Airflow只会在op1
成功完成后才执行op2
。
您可以使用时间传感器。添加一个时间传感器作为sub-task,不断轮询时间点是否已经过去。虽然不那么优雅,但它确实有效。
我们通常会写一个这样的帮助函数:
def wait_till(hour: int, minute: int, second: int, dag):
"""get a DateTimeSensor runs till hour: minute: second for default timezone
Parameters
----------
hour : int
hour in the day
minute : int
minute
second : int
second
dag : [type]
dag
"""
target_time_str = f'next_execution_date.in_tz("Asia/Shanghai").replace(hour={hour}, minute={minute}, second={second})'
task_id_str = f"wait_till_{hour:02d}{minute:02d}{second:02d}"
return DateTimeSensor(
task_id=task_id_str, target_time="{{ " + target_time_str + " }}", dag=dag, poke_interval=5
)
然后将其用作像
这样的 dag 中的计时器任务wait_till(8,0,0) >> your_task
我们还尝试将单个任务放在单独的 dag 中,然后使用外部任务传感器对任务依赖性进行建模。这种方法给我们带来的问题是我们经常需要重新安排任务。重新安排 airflow 中的 dag 会丢失所有日志,这对我们来说是不能接受的。