对于某些气流 DAG 运行,模板键 {{ prev_data_interval_end_success }} 等于 {{ dag_run.execution_date }}

Template keys {{ prev_data_interval_end_success }} equals {{ dag_run.execution_date }} for some airflow DAG runs

我正在解决无法在正确的数据间隔内将 DAG 设为 运行 的问题。

DAG 的形式为:

CYCLE_START_SECONDS = 240
CYCLE_END_SECONDS = 120


@dag(schedule_interval=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS), start_date=datetime(2021, 11, 16),
 catchup=True, default_args=DEFAULT_ARGS,
 on_failure_callback=emit_on_task_failure, max_active_runs=1, on_success_callback=emit_on_dag_success,
 render_template_as_native_obj=True)
def ETL_Workflow():
"""
Workflow to post process raw data into metrics for ingest into ES
:return:
"""

@task()
def get_start_date(start, end):
    print(start, end)
    end = int(end.timestamp())
    if isinstance(start, pendulum.DateTime):
        start = int(start.timestamp())
    else:
        start = end - CYCLE_START_SECONDS
    return start, end

@task(execution_timeout=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS))
def run_query(start_end: tuple, query_template, conn_str, redis_key, transforms):
    start, end = start_end
    query = query_template.format(start=start, end=end)
    return run_pipeline(query, conn_str, redis_key, transforms)

@task()
def store_period_end(start_end: tuple):
    _ = Variable.set(DAG_NAME + "_period_end", start_end[1])
    return

start = '{{ prev_data_interval_end_success }}'
end = '{{ dag_run.execution_date }}'

conn_str = get_source_url(SECRET)
start_end = get_start_date(start, end)
t1 = run_query(start_end, QUERY, conn_str, REDIS_KEY, TRANSFORMS)
t3 = store_period_end(start_end)
start_end >> t1 >> t3


dag = ETL_Workflow()

具体来说,我使用这些模板获得了所需的数据间隔:

start = '{{ prev_data_interval_end_success }}'
end = '{{ dag_run.execution_date }}'

但由于某些原因,这些值解析为相同的日期时间

[2021-12-04, 18:42:20 UTC] {logging_mixin.py:109} INFO - start: 2021-12-04T18:40:18.451905+00:00 end: 2021-12-04 18:40:18.451905+00:00

但是,您可以看到 运行 元数据中的数据间隔是正确的:

我很难过。 DAG 执行日期应在前一个 运行 的数据间隔结束后 CYCLE_START_SECONDS。我对 get_start_date 中的逻辑进行了单元测试,没问题。此外,某些工作流程不会遇到此问题。对于这些工作流,执行日期时间正确计算到上一个数据间隔结束后的 CYCLE_START_SECONDS。我是否错误地使用了模板?我是否错误地指定了时间表?任何关于可能是什么问题的指示将不胜感激。谢谢。

我认为您误会了 execution_date(这很正常,因为这是一个非常令人困惑的概念)。 DAG 运行 的 execution_date 不是 运行 发生的时候,而是它应该处理的数据开始进来的时候。对于间隔调度的 DAG,execution_date 几乎总是等于 data_interval_start,这又几乎总是等于它之前的 DAG 运行 的 data_interval_end。这意味着 execution_date 之前的 运行 的 data_interval_end,而不是之后的间隔。因此,如果前面的 运行 成功,您会看到 prev_data_interval_end_success 等于 execution_date。完全正常。

鉴于您知道 prev_data_interval_end_success 的存在,您可能也知道 execution_date 已被弃用,这正是因为这个概念太混乱了。编写新的 DAG 时不要使用它;您可能正在寻找 data_interval_end