气流 | 运行 任务按自己的时间表进行

Airflow | Run task on it's own schedule

有没有人知道如何安排一个特定的任务在它自己的独立时间触发,而不考虑 schedule_interval 下的设置?

我有一个每小时运行一次的 DAG,它至少执行 7 个不同的任务。我想在中间添加第 8 个任务,以便它在凌晨 1 点运行。背后的想法是我希望这个任务能够完全验证我一整天得到的每小时输出。

更多详情:

I have a DAG that is getting a response from an API request on the hourly basis. These metrics will update as the day goes by, but at 1AM the following day, I want to run a task that grabs all the metrics from the previous day to formally "close" and will also give me reassurance that the previous day metrics are accurate. It'll be ideal to have it in the same DAG since I'm using the other tasks to populate a DB with the extracted data.

task_1 hourly
task_2 hourly
task_3 hourly
task_4 daily

谢谢!

您可以使用 Python 计划来执行此操作:只需在凌晨 1 点执行后取消此作业。

import schedule
import time

def job_that_executes_once():
    # Do some work that only needs to happen once...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

查看文档 here

还有另一种方法。将仅包含年、月和日的 日期对象 添加到该函数中。默认为空对象,如果当前日期与日期对象不同,则将当前日期放入该日期对象中。如果当前日期与日期对象相同,则忽略此 运行。并且每次调用那个任务的时候,只要检查这个条件是否满足即可。

ShortCircuitOperator 可用于实现此用例。

创建一个函数来检索当前时间并检查执行“task_4”所需的时间,在本例中为凌晨 1 点。如果时间是凌晨 1 点,我们说 return True,如果不是,则说 False。将此 ShortCircuitOperator 任务放在 DAG 中的“task_4”之前。只有函数returns True才会执行“task_4”;否则将跳过“task_4”并且 DAG 完成。

task_1 >> task_2 >> task_3 >> short_circuit_task >> task_4