Apache Airflow 1.10+ 调度程序是否在特定时间支持 运行 不同 DST 感知时区中的 2 个 DAG?

Does Apache Airflow 1.10+ scheduler support running 2 DAGs in different DST aware time-zones at specific times?

Apache 气流 1.10+ introduced native support for DST aware timezones

这让我想到(也许是错误的)应该可以在同一个 Airflow 调度器上创建 2 个 DAG,它们的调度方式如下:

不需要引入任务,"sleep"直到要求的开始时间。该文档明确排除了 DST 感知调度的 cron 调度程序,但仅解释了如何在该时区每天将 DAG 设置为 运行,默认情况下为午夜。

关于此主题的先前问题只考虑使用 or are based on ,它没有引入对 DST 感知时区的本机支持。

在 "airflow.cfg" 中,我将 default_timezone 更新为系统时区。然后我尝试像这样安排 DAG:

DAG('NZ_SOD',
    description='New Zealand Start of Day',
    start_date=datetime(2018, 12, 11, 06, 00, tzinfo=pendulum.timezone('Pacific/Auckland')),
    catchup=False)

并且:

DAG('NAM_EOD',
    description='North Americas End of Day',
    start_date=datetime(2018, 12, 11, 21, 00, tzinfo=pendulum.timezone('America/New_York')),
    catchup=False)

但似乎传递给 start_date 的日期时间对象的 "Time" 部分在 Apache Airflow 中未明确考虑并产生意外行为。

Airflow 是否有任何内置选项来产生所需的行为,或者我是否在尝试使用错误的工具来完成这项工作?

先几下:

  • 不要指定带有前导 0 的日期时间,例如 06 am,因为如果您匆忙将其编辑为上午 9 点,您会发现那不是有效的八进制数,并且整个 DAG 文件将停止解析.
  • 你不妨用摆锤表示法:start_date=pendulum.datetime(2018, 12, 11, 6, 0, tz='Pacific/Auckland'),

是的,Airflow 中的时区有点混乱。 The docs 说 cron 计划总是在那个时区的偏移量中。这并不像它应该的那样清楚,因为偏移量会有所不同。假设您像这样设置默认配置时区:

[core]
default_timezone = America/New_York

有一个start_date喜欢:

start_date = datetime(2018, 12, 11, 6, 0),

你得到 offset-18000 或 -5h 的 UTC。

start_date = datetime(2018, 4, 11, 6, 0),

你得到 offset,UTC 为 -14400 或 -4h。

第二个要点中的偏移量为 46800 或 13h,而 4 月在奥克兰则是 43200 或 12h。如果我没记错的话,这些将应用于 DAG 的 schedule_interval

文档似乎说的是您的 schedule_interval crontab 字符串将永远在相同的偏移量中解释。因此,如果您于 12 月在纽约市开始,0 5 * * * 将在凌晨 5 点或 6 点到达 运行,如果您于 4 月在纽约市开始,则为凌晨 5 点或 4 点。呃。我认为这是对的。我也对此感到困惑。

将默认值保留为 utc 并不能避免这种情况。不,如果您使用 start_date 并选择具有不同 utc 偏移量的区域,则不会。

现在……第二期,时间。开始日期曾经是最早的有效开始时间间隔。一天中的某个时间很棒,但时间表默认为 timedelta(days=1)。我 认为 @daily,这也意味着 0 0 * * *,并为您提供有趣的结果,例如从 12 月 11 日早上 6 点开始的开始日期,您的第一个完整午夜 -至午夜间隔将在 12 月 13 日午夜结束,因此第一个 运行 在 12 月 12 日午夜日期作为 execution_date 传递。但我希望将 timedelta 应用于 start_date 后,它会在 12 月 12 日早上 6 点开始,而昨天与 execution_date 的时间相同。但是我还没有看到它以这种方式工作,这确实让我认为它可能只使用 datetimedate 部分用于 start_date 某处。

如文档所述,传入 exeucution_date(以及所有宏日期)的时间将采用 UTC(因此 start_date 时区偏移中的午夜或早上 6 点,转换为 UTC)。至少它们附有 tz,因此您可以在必要时对它们使用 convert

答案是肯定的,cron 计划支持在 DST 感知时区中使用 DAG 运行。

但是有很多警告,所以我不得不假设 Airflow 的维护者没有将此作为受支持的用例。首先,在撰写本文时,documentation 明确地 是错误的 ,它指出:

Cron schedules

In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.

我写了这段有点老套的代码,让你看看在不需要 运行ning Airflow 实例的情况下时间表如何工作(注意你已经安装了 Penulum 1.x 并使用如果您 运行 或编辑此代码,请更正 documentation):

import pendulum
from airflow import DAG
from datetime import timedelta


# Set-up DAG
test_dag = DAG(
    dag_id='foo',
    start_date=pendulum.datetime(year=2019, month=4, day=4, tz='Pacific/Auckland'),
    schedule_interval='00 03 * * *',
    catchup=False
)

# Check initial schedule
execution_date = test_dag.start_date
for _ in range(7):
    next_execution_date = test_dag.following_schedule(execution_date)
    if next_execution_date <= execution_date:
        execution_date = test_dag.following_schedule(execution_date + timedelta(hours=2))
    else:
        execution_date = next_execution_date
    print('Execution Date:', execution_date)

这给了我们新西兰经历 DST 的 7 天时间:

Execution Date: 2019-04-03 14:00:00+00:00
Execution Date: 2019-04-04 14:00:00+00:00
Execution Date: 2019-04-05 14:00:00+00:00
Execution Date: 2019-04-06 14:00:00+00:00
Execution Date: 2019-04-07 15:00:00+00:00
Execution Date: 2019-04-08 15:00:00+00:00
Execution Date: 2019-04-09 15:00:00+00:00

我们可以看到使用 cron 计划观察到 DST,如果您进一步编辑我的代码以删除 cron 计划,您可以看到 观察到 DST。

但请注意,即使 cron 计划遵守夏令时,您仍然可能会遇到 1 天前的错误,并且在夏令时更改当天,因为 Airflow 提供的是之前的日期而不是当前日期(例如星期日日历,但在 Airflow 中,执行日期是星期六)。在我看来,follow_schedule 逻辑中没有考虑到这一点。

最后,@dlamblin 指出 Airflow 通过模板化字符串或 provide_context=True 为作业提供的变量,如果 DAG 的本地执行日期是 Python 可调用文件将是错误的与 UTC 执行日期不同。这可以在 TaskInstance.get_template_context which uses self.execution_date without modifying it to be in local time. And we can see in TaskInstance.__init__ 中观察到 self.execution_date 被转换为 UTC。

我处理这个问题的方法是按照@dlamblin 的建议并使用 Pendulum 的 convert 方法派生一个我称为 local_cal_date 的变量。编辑此代码以满足您的特定需求(我实际上在我所有 Python 可调用对象的包装器中使用它,以便它们都收到 local_cal_date):

import datetime

def foo(*args, dag, execution_date, **kwargs):
    # Derive local execution datetime from dag and execution_date that
    # airflow passes to python callables where provide_context is set to True
    airflow_timezone = dag.timezone
    local_execution_datetime = airflow_timezone.convert(execution_date)

    # I then add 1 day to make it the calendar day
    # and not the execution date which Airflow provides
    local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)

更新: 对于模板化字符串,我发现最好的方法是创建自定义运算符,在呈现模板之前将自定义变量注入到上下文中。我发现使用自定义宏的问题是它们 ,这意味着您必须做大量额外的工作才能以有用的方式呈现它们。所以在自定义运算符模块中,我有些类似于此代码:

# Standard Library
import datetime

# Third Party Libraries
import airflow.operators.email_operator
import airflow.operators.python_operator
import airflow.operators.bash_operator


class CustomTemplateVarsMixin:
    def render_template(self, attr, content, context):
        # Do Calculations
        airflow_execution_datetime = context['execution_date']
        airflow_timezone = context['dag'].timezone
        local_execution_datetime = airflow_timezone.convert(airflow_execution_datetime)
        local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)

        # Add to contexts
        context['local_cal_datetime'] = local_cal_datetime

        # Run normal Method
        return super().render_template(self, attr, content, context)


class BashOperator(CustomTemplateVarsMixin, airflow.operators.bash_operator.BashOperator):
    pass


class EmailOperator(CustomTemplateVarsMixin, airflow.operators.email_operator.EmailOperator):
    pass


class PythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.PythonOperator):
    pass


class BranchPythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.BranchPythonOperator):
    pass