时区感知 DAG 的气流错误 execution_date

Airflow wrong execution_date for timezone aware DAGs

我们在 Kubernetes (KubernetesExecutor) 上使用 Airflow v2.2.3, 我们的环境需要一个DAG预客户,每个客户可以在不同的时区。

每个 DAG 都应该在午夜安排在自己的时区, 我看到它可以使用 Airflow 的 timezone aware DAGs

来实现

因此为每个 DAG 配置时区感知 start_date 工作并使每个 DAG 在午夜在其自己的时区执行:

start_date_utc = (datetime.now() - timedelta(days=2)).replace(
        hour=0, minute=0, second=0, microsecond=0)

timezone = pendulum.timezone(get_customer_timezone(customer))
START_DATE = start_date_utc.replace(tzinfo=timezone)

default_args = {
        "owner": "owner",
        "depends_on_past": False,
        "start_date": START_DATE,
}

dag = DAG(
    dag_id,
    schedule_interval="0 0 * * *",
    default_args=default_args,
    tags=[cusotmer_name]
)

date = '{{ execution_date | ds }}'

operator_args = {
    "customer_date": date,
}

我的问题是 jinja template 和 dag_run execution_date (dag_run.logical_date) 仍然是 UTC,并且没有根据 DAG 时区进行调整。

当 运行 不同时区的 DAG 时区偏移早于 UTC 的 DAG execution_date 是错误的(2 天前而不是 1 天)

关于如何根据 DAG 时区

更改 execution_date,我需要一些建议

谢谢

execution_date 的值是 UTC。 要转换为不同的时区,您可以这样做:

{{ execution_date.in_timezone('Europe/Amsterdam') }}

如果您在 DAG 中设置时区,您可以从 dag.timezone 访问它并使用它 Jinja。

{{ dag_run.execution_date.astimezone(dag.timezone) }}

您已经注意到 execution_date 是已弃用的宏,因此您应该使用 logical_date 作为:

{{ dag_run.logical_date.astimezone(dag.timezone) }}