Apache Airflow 1.10+ 调度程序是否在特定时间支持 运行 不同 DST 感知时区中的 2 个 DAG?
Does Apache Airflow 1.10+ scheduler support running 2 DAGs in different DST aware time-zones at specific times?
Apache 气流 1.10+ introduced native support for DST aware timezones。
这让我想到(也许是错误的)应该可以在同一个 Airflow 调度器上创建 2 个 DAG,它们的调度方式如下:
- 每天 06:00
Pacific/Auckland
时间开始
- 每天 21:00
America/New_York
时间开始
不需要引入任务,"sleep"直到要求的开始时间。该文档明确排除了 DST 感知调度的 cron 调度程序,但仅解释了如何在该时区每天将 DAG 设置为 运行,默认情况下为午夜。
关于此主题的先前问题只考虑使用 or are based on ,它没有引入对 DST 感知时区的本机支持。
在 "airflow.cfg" 中,我将 default_timezone
更新为系统时区。然后我尝试像这样安排 DAG:
DAG('NZ_SOD',
description='New Zealand Start of Day',
start_date=datetime(2018, 12, 11, 06, 00, tzinfo=pendulum.timezone('Pacific/Auckland')),
catchup=False)
并且:
DAG('NAM_EOD',
description='North Americas End of Day',
start_date=datetime(2018, 12, 11, 21, 00, tzinfo=pendulum.timezone('America/New_York')),
catchup=False)
但似乎传递给 start_date
的日期时间对象的 "Time" 部分在 Apache Airflow 中未明确考虑并产生意外行为。
Airflow 是否有任何内置选项来产生所需的行为,或者我是否在尝试使用错误的工具来完成这项工作?
先几下:
- 不要指定带有前导 0 的日期时间,例如 06 am,因为如果您匆忙将其编辑为上午 9 点,您会发现那不是有效的八进制数,并且整个 DAG 文件将停止解析.
- 你不妨用摆锤表示法:
start_date=pendulum.datetime(2018, 12, 11, 6, 0, tz='Pacific/Auckland')
,
是的,Airflow 中的时区有点混乱。 The docs 说 cron 计划总是在那个时区的偏移量中。这并不像它应该的那样清楚,因为偏移量会有所不同。假设您像这样设置默认配置时区:
[core]
default_timezone = America/New_York
有一个start_date
喜欢:
start_date = datetime(2018, 12, 11, 6, 0),
你得到 offset
和 -18000
或 -5h 的 UTC。
start_date = datetime(2018, 4, 11, 6, 0),
你得到 offset
,UTC 为 -14400
或 -4h。
第二个要点中的偏移量为 46800
或 13h,而 4 月在奥克兰则是 43200
或 12h。如果我没记错的话,这些将应用于 DAG 的 schedule_interval
。
文档似乎说的是您的 schedule_interval
crontab 字符串将永远在相同的偏移量中解释。因此,如果您于 12 月在纽约市开始,0 5 * * *
将在凌晨 5 点或 6 点到达 运行,如果您于 4 月在纽约市开始,则为凌晨 5 点或 4 点。呃。我认为这是对的。我也对此感到困惑。
将默认值保留为 utc 并不能避免这种情况。不,如果您使用 start_date
并选择具有不同 utc 偏移量的区域,则不会。
现在……第二期,时间。开始日期曾经是最早的有效开始时间间隔。一天中的某个时间很棒,但时间表默认为 timedelta(days=1)
。我 认为 是 @daily
,这也意味着 0 0 * * *
,并为您提供有趣的结果,例如从 12 月 11 日早上 6 点开始的开始日期,您的第一个完整午夜 -至午夜间隔将在 12 月 13 日午夜结束,因此第一个 运行 在 12 月 12 日午夜日期作为 execution_date
传递。但我希望将 timedelta
应用于 start_date
后,它会在 12 月 12 日早上 6 点开始,而昨天与 execution_date
的时间相同。但是我还没有看到它以这种方式工作,这确实让我认为它可能只使用 datetime
的 date
部分用于 start_date
某处。
如文档所述,传入 exeucution_date
(以及所有宏日期)的时间将采用 UTC(因此 start_date
时区偏移中的午夜或早上 6 点,转换为 UTC)。至少它们附有 tz,因此您可以在必要时对它们使用 convert
。
答案是肯定的,cron 计划支持在 DST 感知时区中使用 DAG 运行。
但是有很多警告,所以我不得不假设 Airflow 的维护者没有将此作为受支持的用例。首先,在撰写本文时,documentation 明确地 是错误的 ,它指出:
Cron schedules
In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.
我写了这段有点老套的代码,让你看看在不需要 运行ning Airflow 实例的情况下时间表如何工作(注意你已经安装了 Penulum 1.x 并使用如果您 运行 或编辑此代码,请更正 documentation):
import pendulum
from airflow import DAG
from datetime import timedelta
# Set-up DAG
test_dag = DAG(
dag_id='foo',
start_date=pendulum.datetime(year=2019, month=4, day=4, tz='Pacific/Auckland'),
schedule_interval='00 03 * * *',
catchup=False
)
# Check initial schedule
execution_date = test_dag.start_date
for _ in range(7):
next_execution_date = test_dag.following_schedule(execution_date)
if next_execution_date <= execution_date:
execution_date = test_dag.following_schedule(execution_date + timedelta(hours=2))
else:
execution_date = next_execution_date
print('Execution Date:', execution_date)
这给了我们新西兰经历 DST 的 7 天时间:
Execution Date: 2019-04-03 14:00:00+00:00
Execution Date: 2019-04-04 14:00:00+00:00
Execution Date: 2019-04-05 14:00:00+00:00
Execution Date: 2019-04-06 14:00:00+00:00
Execution Date: 2019-04-07 15:00:00+00:00
Execution Date: 2019-04-08 15:00:00+00:00
Execution Date: 2019-04-09 15:00:00+00:00
我们可以看到使用 cron 计划观察到 DST,如果您进一步编辑我的代码以删除 cron 计划,您可以看到 未 观察到 DST。
但请注意,即使 cron 计划遵守夏令时,您仍然可能会遇到 1 天前的错误,并且在夏令时更改当天,因为 Airflow 提供的是之前的日期而不是当前日期(例如星期日日历,但在 Airflow 中,执行日期是星期六)。在我看来,follow_schedule
逻辑中没有考虑到这一点。
最后,@dlamblin 指出 Airflow 通过模板化字符串或 provide_context=True
为作业提供的变量,如果 DAG 的本地执行日期是 Python 可调用文件将是错误的与 UTC 执行日期不同。这可以在 TaskInstance.get_template_context which uses self.execution_date
without modifying it to be in local time. And we can see in TaskInstance.__init__ 中观察到 self.execution_date
被转换为 UTC。
我处理这个问题的方法是按照@dlamblin 的建议并使用 Pendulum 的 convert
方法派生一个我称为 local_cal_date
的变量。编辑此代码以满足您的特定需求(我实际上在我所有 Python 可调用对象的包装器中使用它,以便它们都收到 local_cal_date
):
import datetime
def foo(*args, dag, execution_date, **kwargs):
# Derive local execution datetime from dag and execution_date that
# airflow passes to python callables where provide_context is set to True
airflow_timezone = dag.timezone
local_execution_datetime = airflow_timezone.convert(execution_date)
# I then add 1 day to make it the calendar day
# and not the execution date which Airflow provides
local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)
更新: 对于模板化字符串,我发现最好的方法是创建自定义运算符,在呈现模板之前将自定义变量注入到上下文中。我发现使用自定义宏的问题是它们 ,这意味着您必须做大量额外的工作才能以有用的方式呈现它们。所以在自定义运算符模块中,我有些类似于此代码:
# Standard Library
import datetime
# Third Party Libraries
import airflow.operators.email_operator
import airflow.operators.python_operator
import airflow.operators.bash_operator
class CustomTemplateVarsMixin:
def render_template(self, attr, content, context):
# Do Calculations
airflow_execution_datetime = context['execution_date']
airflow_timezone = context['dag'].timezone
local_execution_datetime = airflow_timezone.convert(airflow_execution_datetime)
local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)
# Add to contexts
context['local_cal_datetime'] = local_cal_datetime
# Run normal Method
return super().render_template(self, attr, content, context)
class BashOperator(CustomTemplateVarsMixin, airflow.operators.bash_operator.BashOperator):
pass
class EmailOperator(CustomTemplateVarsMixin, airflow.operators.email_operator.EmailOperator):
pass
class PythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.PythonOperator):
pass
class BranchPythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.BranchPythonOperator):
pass
Apache 气流 1.10+ introduced native support for DST aware timezones。
这让我想到(也许是错误的)应该可以在同一个 Airflow 调度器上创建 2 个 DAG,它们的调度方式如下:
- 每天 06:00
Pacific/Auckland
时间开始 - 每天 21:00
America/New_York
时间开始
不需要引入任务,"sleep"直到要求的开始时间。该文档明确排除了 DST 感知调度的 cron 调度程序,但仅解释了如何在该时区每天将 DAG 设置为 运行,默认情况下为午夜。
关于此主题的先前问题只考虑使用
在 "airflow.cfg" 中,我将 default_timezone
更新为系统时区。然后我尝试像这样安排 DAG:
DAG('NZ_SOD',
description='New Zealand Start of Day',
start_date=datetime(2018, 12, 11, 06, 00, tzinfo=pendulum.timezone('Pacific/Auckland')),
catchup=False)
并且:
DAG('NAM_EOD',
description='North Americas End of Day',
start_date=datetime(2018, 12, 11, 21, 00, tzinfo=pendulum.timezone('America/New_York')),
catchup=False)
但似乎传递给 start_date
的日期时间对象的 "Time" 部分在 Apache Airflow 中未明确考虑并产生意外行为。
Airflow 是否有任何内置选项来产生所需的行为,或者我是否在尝试使用错误的工具来完成这项工作?
先几下:
- 不要指定带有前导 0 的日期时间,例如 06 am,因为如果您匆忙将其编辑为上午 9 点,您会发现那不是有效的八进制数,并且整个 DAG 文件将停止解析.
- 你不妨用摆锤表示法:
start_date=pendulum.datetime(2018, 12, 11, 6, 0, tz='Pacific/Auckland')
,
是的,Airflow 中的时区有点混乱。 The docs 说 cron 计划总是在那个时区的偏移量中。这并不像它应该的那样清楚,因为偏移量会有所不同。假设您像这样设置默认配置时区:
[core]
default_timezone = America/New_York
有一个start_date
喜欢:
start_date = datetime(2018, 12, 11, 6, 0),
你得到 offset
和 -18000
或 -5h 的 UTC。
start_date = datetime(2018, 4, 11, 6, 0),
你得到 offset
,UTC 为 -14400
或 -4h。
第二个要点中的偏移量为 46800
或 13h,而 4 月在奥克兰则是 43200
或 12h。如果我没记错的话,这些将应用于 DAG 的 schedule_interval
。
文档似乎说的是您的 schedule_interval
crontab 字符串将永远在相同的偏移量中解释。因此,如果您于 12 月在纽约市开始,0 5 * * *
将在凌晨 5 点或 6 点到达 运行,如果您于 4 月在纽约市开始,则为凌晨 5 点或 4 点。呃。我认为这是对的。我也对此感到困惑。
将默认值保留为 utc 并不能避免这种情况。不,如果您使用 start_date
并选择具有不同 utc 偏移量的区域,则不会。
现在……第二期,时间。开始日期曾经是最早的有效开始时间间隔。一天中的某个时间很棒,但时间表默认为 timedelta(days=1)
。我 认为 是 @daily
,这也意味着 0 0 * * *
,并为您提供有趣的结果,例如从 12 月 11 日早上 6 点开始的开始日期,您的第一个完整午夜 -至午夜间隔将在 12 月 13 日午夜结束,因此第一个 运行 在 12 月 12 日午夜日期作为 execution_date
传递。但我希望将 timedelta
应用于 start_date
后,它会在 12 月 12 日早上 6 点开始,而昨天与 execution_date
的时间相同。但是我还没有看到它以这种方式工作,这确实让我认为它可能只使用 datetime
的 date
部分用于 start_date
某处。
如文档所述,传入 exeucution_date
(以及所有宏日期)的时间将采用 UTC(因此 start_date
时区偏移中的午夜或早上 6 点,转换为 UTC)。至少它们附有 tz,因此您可以在必要时对它们使用 convert
。
答案是肯定的,cron 计划支持在 DST 感知时区中使用 DAG 运行。
但是有很多警告,所以我不得不假设 Airflow 的维护者没有将此作为受支持的用例。首先,在撰写本文时,documentation 明确地 是错误的 ,它指出:
Cron schedules
In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.
我写了这段有点老套的代码,让你看看在不需要 运行ning Airflow 实例的情况下时间表如何工作(注意你已经安装了 Penulum 1.x 并使用如果您 运行 或编辑此代码,请更正 documentation):
import pendulum
from airflow import DAG
from datetime import timedelta
# Set-up DAG
test_dag = DAG(
dag_id='foo',
start_date=pendulum.datetime(year=2019, month=4, day=4, tz='Pacific/Auckland'),
schedule_interval='00 03 * * *',
catchup=False
)
# Check initial schedule
execution_date = test_dag.start_date
for _ in range(7):
next_execution_date = test_dag.following_schedule(execution_date)
if next_execution_date <= execution_date:
execution_date = test_dag.following_schedule(execution_date + timedelta(hours=2))
else:
execution_date = next_execution_date
print('Execution Date:', execution_date)
这给了我们新西兰经历 DST 的 7 天时间:
Execution Date: 2019-04-03 14:00:00+00:00
Execution Date: 2019-04-04 14:00:00+00:00
Execution Date: 2019-04-05 14:00:00+00:00
Execution Date: 2019-04-06 14:00:00+00:00
Execution Date: 2019-04-07 15:00:00+00:00
Execution Date: 2019-04-08 15:00:00+00:00
Execution Date: 2019-04-09 15:00:00+00:00
我们可以看到使用 cron 计划观察到 DST,如果您进一步编辑我的代码以删除 cron 计划,您可以看到 未 观察到 DST。
但请注意,即使 cron 计划遵守夏令时,您仍然可能会遇到 1 天前的错误,并且在夏令时更改当天,因为 Airflow 提供的是之前的日期而不是当前日期(例如星期日日历,但在 Airflow 中,执行日期是星期六)。在我看来,follow_schedule
逻辑中没有考虑到这一点。
最后,@dlamblin 指出 Airflow 通过模板化字符串或 provide_context=True
为作业提供的变量,如果 DAG 的本地执行日期是 Python 可调用文件将是错误的与 UTC 执行日期不同。这可以在 TaskInstance.get_template_context which uses self.execution_date
without modifying it to be in local time. And we can see in TaskInstance.__init__ 中观察到 self.execution_date
被转换为 UTC。
我处理这个问题的方法是按照@dlamblin 的建议并使用 Pendulum 的 convert
方法派生一个我称为 local_cal_date
的变量。编辑此代码以满足您的特定需求(我实际上在我所有 Python 可调用对象的包装器中使用它,以便它们都收到 local_cal_date
):
import datetime
def foo(*args, dag, execution_date, **kwargs):
# Derive local execution datetime from dag and execution_date that
# airflow passes to python callables where provide_context is set to True
airflow_timezone = dag.timezone
local_execution_datetime = airflow_timezone.convert(execution_date)
# I then add 1 day to make it the calendar day
# and not the execution date which Airflow provides
local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)
更新: 对于模板化字符串,我发现最好的方法是创建自定义运算符,在呈现模板之前将自定义变量注入到上下文中。我发现使用自定义宏的问题是它们
# Standard Library
import datetime
# Third Party Libraries
import airflow.operators.email_operator
import airflow.operators.python_operator
import airflow.operators.bash_operator
class CustomTemplateVarsMixin:
def render_template(self, attr, content, context):
# Do Calculations
airflow_execution_datetime = context['execution_date']
airflow_timezone = context['dag'].timezone
local_execution_datetime = airflow_timezone.convert(airflow_execution_datetime)
local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)
# Add to contexts
context['local_cal_datetime'] = local_cal_datetime
# Run normal Method
return super().render_template(self, attr, content, context)
class BashOperator(CustomTemplateVarsMixin, airflow.operators.bash_operator.BashOperator):
pass
class EmailOperator(CustomTemplateVarsMixin, airflow.operators.email_operator.EmailOperator):
pass
class PythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.PythonOperator):
pass
class BranchPythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.BranchPythonOperator):
pass