如何正确处理 Apache Airflow 中的夏令时?
How to properly handle Daylight Savings Time in Apache Airflow?
在 airflow 中,一切都应该是 UTC(不受 DST 影响)。
但是,我们的工作流程会根据受 DST 影响的时区交付内容。
示例场景:
- 我们安排了一项工作,开始日期为东部时间 8:00 上午,计划间隔为 24 小时。
- 每天东部时间上午 8 点,调度程序会发现自上次 运行 以来已经 24 小时了,并且 运行 完成了作业。
- 夏令时到了,我们损失了一个小时。
- 今天东部时间上午 8 点,调度程序发现只有 23 小时,因为机器上的时间是 UTC,并且 运行 直到东部时间上午 9 点才开始作业,这是延迟交付
有没有办法安排 dag,以便它们在时间更改后 运行 在正确的时间?
不在我的脑海中:
如果您的机器支持时区,请将 DAG 设置为 运行 美国东部时间上午 8 点 和 美国东部时间上午 8 点(UTC)。类似于 0 11,12 * * *
。第一个任务是 ShortCircuit 操作员。然后使用类似 pytz 的东西来本地化当前时间。如果它在您要求的时间内,请继续(即:运行 DAG)。否则,return 错误。每天 2 个额外任务的开销很小,但只要您的机器不超载,延迟应该是最小的。
马虎的例子:
from datetime import datetime
from pytz import utc, timezone
# ...
def is8AM(**kwargs):
ti = kwargs["ti"]
curtime = utc.localize(datetime.utcnow())
# If you want to use the exec date:
# curtime = utc.localize(ti.execution_date)
eastern = timezone('US/Eastern') # From docs, check your local names
loc_dt = curtime.astimezone(eastern)
if loc_dt.hour == 8:
return True
return False
start_task = ShortCircuitOperator(
task_id='check_for_8AM',
python_callable=is8AM,
provide_context=True,
dag=dag
)
希望对您有所帮助
编辑:运行次错误,减去而不是加。此外,由于 运行 的启动方式,如果您希望它们在 运行 8.
我们使用了@apathyman 解决方案,但我们没有使用 ShortCircuit,而是使用了 PythonOperator,如果它不是我们想要的时间,它就会失败,并且重试 timedelta 为 1 小时。
这样我们每天只有 1 运行 而不是 2.
并且仅在第一个小时
将计划间隔设置为 运行
基本上就是这样(大部分代码取自上述答案,感谢@apathyman):
from datetime import datetime
from datetime import timedelta
from pytz import utc, timezone
def is8AM(**kwargs):
ti = kwargs["ti"]
curtime = utc.localize(datetime.utcnow())
# If you want to use the exec date:
# curtime = utc.localize(ti.execution_date)
eastern = timezone('US/Eastern') # From docs, check your local names
loc_dt = curtime.astimezone(eastern)
if loc_dt.hour == 8:
return True
exit("Not the time yet, wait 1 hour")
start_task = PythonOperator(
task_id='check_for_8AM',
python_callable=is8AM,
provide_context=True,
retries=1,
retry_delay=timedelta(hours=1),
dag=dag
)
我相信我们只需要一个 PythonOperator 来处理这种情况。
如果 DAG 需要在 DST TZ 中 运行(例如:America/New_York、Europe/London、Australia/Sydney),那么以下是我可以采取的解决方法想想:
- 将 DAG 时间表转换为 UTC TZ。
因为 TZ 有夏令时,所以我们需要选择更大的偏移量
在进行转换时。例如:
- 使用America/New_York TZ:我们必须使用偏移量
-4
。所以时间表 */10 11-13 * * 1-5
将转换为 */10 15-17 * * 1-5
- 与Europe/London:我们必须使用偏移量
+1
。所以时间表 35 */4 * * *
将转换为 35 3-23/4 * * *
- 与Australia/Sydney:我们必须使用偏移量
+11
。所以时间表 15 8,9,12,18 * * *
将转换为 15 21,22,1,7 * * *
使用PythonOperator
在所有主要任务之前做一个任务。此任务将检查当前时间是否在指定 TZ 的 DST 中。如果是,则任务将在 1 小时后休眠。
这样我们就可以处理DST TZ的情况了。
def is_DST(zonename):
tz = pytz.timezone(zonename)
now = pytz.utc.localize(datetime.utcnow())
return now.astimezone(tz).dst() != timedelta(0)
def WQ_DST_handler(TZ, **kwargs):
if is_DST(TZ):
print('Currently is daily saving time (DST) in {0}, will process to next task now'.format(TZ))
else:
print('Currently is not daily saving time (DST) in {0}, will sleep 1 hour...'.format(TZ))
time.sleep(60 * 60)
DST_handler = PythonOperator(
task_id='DST_handler',
python_callable=WQ_DST_handler,
op_kwargs={'TZ': TZ_of_dag},
dag=dag
)
DST_handler >> main_tasks
此解决方法有一个缺点:对于任何需要在 DST TZ 中 运行 的 DAG,我们必须创建 1 个进一步的任务(上例中的 DST_handler),并且此任务仍然需要也发送到工作节点执行(尽管它几乎只是一个睡眠命令)。
airflow 在版本 1 上时被问到这个问题。8.x。
从 airflow 1.10 开始,此功能现已内置。
https://airflow.apache.org/timezone.html
在airflow.cfg
中设置时区,应该正确处理dst。
在 airflow 中,一切都应该是 UTC(不受 DST 影响)。
但是,我们的工作流程会根据受 DST 影响的时区交付内容。
示例场景:
- 我们安排了一项工作,开始日期为东部时间 8:00 上午,计划间隔为 24 小时。
- 每天东部时间上午 8 点,调度程序会发现自上次 运行 以来已经 24 小时了,并且 运行 完成了作业。
- 夏令时到了,我们损失了一个小时。
- 今天东部时间上午 8 点,调度程序发现只有 23 小时,因为机器上的时间是 UTC,并且 运行 直到东部时间上午 9 点才开始作业,这是延迟交付
有没有办法安排 dag,以便它们在时间更改后 运行 在正确的时间?
不在我的脑海中:
如果您的机器支持时区,请将 DAG 设置为 运行 美国东部时间上午 8 点 和 美国东部时间上午 8 点(UTC)。类似于 0 11,12 * * *
。第一个任务是 ShortCircuit 操作员。然后使用类似 pytz 的东西来本地化当前时间。如果它在您要求的时间内,请继续(即:运行 DAG)。否则,return 错误。每天 2 个额外任务的开销很小,但只要您的机器不超载,延迟应该是最小的。
马虎的例子:
from datetime import datetime
from pytz import utc, timezone
# ...
def is8AM(**kwargs):
ti = kwargs["ti"]
curtime = utc.localize(datetime.utcnow())
# If you want to use the exec date:
# curtime = utc.localize(ti.execution_date)
eastern = timezone('US/Eastern') # From docs, check your local names
loc_dt = curtime.astimezone(eastern)
if loc_dt.hour == 8:
return True
return False
start_task = ShortCircuitOperator(
task_id='check_for_8AM',
python_callable=is8AM,
provide_context=True,
dag=dag
)
希望对您有所帮助
编辑:运行次错误,减去而不是加。此外,由于 运行 的启动方式,如果您希望它们在 运行 8.
我们使用了@apathyman 解决方案,但我们没有使用 ShortCircuit,而是使用了 PythonOperator,如果它不是我们想要的时间,它就会失败,并且重试 timedelta 为 1 小时。 这样我们每天只有 1 运行 而不是 2.
并且仅在第一个小时
将计划间隔设置为 运行基本上就是这样(大部分代码取自上述答案,感谢@apathyman):
from datetime import datetime
from datetime import timedelta
from pytz import utc, timezone
def is8AM(**kwargs):
ti = kwargs["ti"]
curtime = utc.localize(datetime.utcnow())
# If you want to use the exec date:
# curtime = utc.localize(ti.execution_date)
eastern = timezone('US/Eastern') # From docs, check your local names
loc_dt = curtime.astimezone(eastern)
if loc_dt.hour == 8:
return True
exit("Not the time yet, wait 1 hour")
start_task = PythonOperator(
task_id='check_for_8AM',
python_callable=is8AM,
provide_context=True,
retries=1,
retry_delay=timedelta(hours=1),
dag=dag
)
我相信我们只需要一个 PythonOperator 来处理这种情况。
如果 DAG 需要在 DST TZ 中 运行(例如:America/New_York、Europe/London、Australia/Sydney),那么以下是我可以采取的解决方法想想:
- 将 DAG 时间表转换为 UTC TZ。
因为 TZ 有夏令时,所以我们需要选择更大的偏移量 在进行转换时。例如:- 使用America/New_York TZ:我们必须使用偏移量
-4
。所以时间表*/10 11-13 * * 1-5
将转换为*/10 15-17 * * 1-5
- 与Europe/London:我们必须使用偏移量
+1
。所以时间表35 */4 * * *
将转换为35 3-23/4 * * *
- 与Australia/Sydney:我们必须使用偏移量
+11
。所以时间表15 8,9,12,18 * * *
将转换为15 21,22,1,7 * * *
- 使用America/New_York TZ:我们必须使用偏移量
使用
PythonOperator
在所有主要任务之前做一个任务。此任务将检查当前时间是否在指定 TZ 的 DST 中。如果是,则任务将在 1 小时后休眠。 这样我们就可以处理DST TZ的情况了。def is_DST(zonename): tz = pytz.timezone(zonename) now = pytz.utc.localize(datetime.utcnow()) return now.astimezone(tz).dst() != timedelta(0) def WQ_DST_handler(TZ, **kwargs): if is_DST(TZ): print('Currently is daily saving time (DST) in {0}, will process to next task now'.format(TZ)) else: print('Currently is not daily saving time (DST) in {0}, will sleep 1 hour...'.format(TZ)) time.sleep(60 * 60) DST_handler = PythonOperator( task_id='DST_handler', python_callable=WQ_DST_handler, op_kwargs={'TZ': TZ_of_dag}, dag=dag ) DST_handler >> main_tasks
此解决方法有一个缺点:对于任何需要在 DST TZ 中 运行 的 DAG,我们必须创建 1 个进一步的任务(上例中的 DST_handler),并且此任务仍然需要也发送到工作节点执行(尽管它几乎只是一个睡眠命令)。
airflow 在版本 1 上时被问到这个问题。8.x。
从 airflow 1.10 开始,此功能现已内置。
https://airflow.apache.org/timezone.html
在airflow.cfg
中设置时区,应该正确处理dst。