如何正确处理 Apache Airflow 中的夏令时?

How to properly handle Daylight Savings Time in Apache Airflow?

在 airflow 中,一切都应该是 UTC(不受 DST 影响)。

但是,我们的工作流程会根据受 DST 影响的时区交付内容。

示例场景:

有没有办法安排 dag,以便它们在时间更改后 运行 在正确的时间?

不在我的脑海中:

如果您的机器支持时区,请将 DAG 设置为 运行 美国东部时间上午 8 点 美国东部时间上午 8 点(UTC)。类似于 0 11,12 * * *。第一个任务是 ShortCircuit 操作员。然后使用类似 pytz 的东西来本地化当前时间。如果它在您要求的时间内,请继续(即:运行 DAG)。否则,return 错误。每天 2 个额外任务的开销很小,但只要您的机器不超载,延迟应该是最小的。

马虎的例子:

from datetime import datetime
from pytz import utc, timezone

# ...

def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    return False

start_task = ShortCircuitOperator(
                task_id='check_for_8AM',
                python_callable=is8AM,
                provide_context=True,
                dag=dag
            )

希望对您有所帮助

编辑:运行次错误,减去而不是加。此外,由于 运行 的启动方式,如果您希望它们在 运行 8.

我们使用了@apathyman 解决方案,但我们没有使用 ShortCircuit,而是使用了 PythonOperator,如果它不是我们想要的时间,它就会失败,并且重试 timedelta 为 1 小时。 这样我们每天只有 1 运行 而不是 2.

并且仅在第一个小时

将计划间隔设置为 运行

基本上就是这样(大部分代码取自上述答案,感谢@apathyman):

from datetime import datetime
from datetime import timedelta
from pytz import utc, timezone


def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    exit("Not the time yet, wait 1 hour")

start_task = PythonOperator(
            task_id='check_for_8AM',
            python_callable=is8AM,
            provide_context=True,
            retries=1,
            retry_delay=timedelta(hours=1),
            dag=dag
        )

我相信我们只需要一个 PythonOperator 来处理这种情况。

如果 DAG 需要在 DST TZ 中 运行(例如:America/New_York、Europe/London、Australia/Sydney),那么以下是我可以采取的解决方法想想:

  1. 将 DAG 时间表转换为 UTC TZ。
    因为 TZ 有夏令时,所以我们需要选择更大的偏移量 在进行转换时。例如:
    • 使用America/New_York TZ:我们必须使用偏移量-4。所以时间表 */10 11-13 * * 1-5 将转换为 */10 15-17 * * 1-5
    • 与Europe/London:我们必须使用偏移量+1。所以时间表 35 */4 * * * 将转换为 35 3-23/4 * * *
    • 与Australia/Sydney:我们必须使用偏移量+11。所以时间表 15 8,9,12,18 * * * 将转换为 15 21,22,1,7 * * *
  2. 使用PythonOperator在所有主要任务之前做一个任务。此任务将检查当前时间是否在指定 TZ 的 DST 中。如果是,则任务将在 1 小时后休眠。 这样我们就可以处理DST TZ的情况了。

    def is_DST(zonename):
        tz = pytz.timezone(zonename)
        now = pytz.utc.localize(datetime.utcnow())
        return now.astimezone(tz).dst() != timedelta(0)
    
    
    def WQ_DST_handler(TZ, **kwargs):
        if is_DST(TZ):
            print('Currently is daily saving time (DST) in {0}, will process to next task now'.format(TZ))
        else:
            print('Currently is not daily saving time (DST) in {0}, will sleep 1 hour...'.format(TZ))
            time.sleep(60 * 60)
    
    
    DST_handler = PythonOperator(
        task_id='DST_handler',
        python_callable=WQ_DST_handler,
        op_kwargs={'TZ': TZ_of_dag},
        dag=dag
    )
    
    DST_handler >> main_tasks
    

此解决方法有一个缺点:对于任何需要在 DST TZ 中 运行 的 DAG,我们必须创建 1 个进一步的任务(上例中的 DST_handler),并且此任务仍然需要也发送到工作节点执行(尽管它几乎只是一个睡眠命令)。

airflow 在版本 1 上时被问到这个问题。8.x。

从 airflow 1.10 开始,此功能现已内置。

https://airflow.apache.org/timezone.html

airflow.cfg中设置时区,应该正确处理dst。