对于某些气流 DAG 运行,模板键 {{ prev_data_interval_end_success }} 等于 {{ dag_run.execution_date }}
Template keys {{ prev_data_interval_end_success }} equals {{ dag_run.execution_date }} for some airflow DAG runs
我正在解决无法在正确的数据间隔内将 DAG 设为 运行 的问题。
DAG 的形式为:
CYCLE_START_SECONDS = 240
CYCLE_END_SECONDS = 120
@dag(schedule_interval=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS), start_date=datetime(2021, 11, 16),
catchup=True, default_args=DEFAULT_ARGS,
on_failure_callback=emit_on_task_failure, max_active_runs=1, on_success_callback=emit_on_dag_success,
render_template_as_native_obj=True)
def ETL_Workflow():
"""
Workflow to post process raw data into metrics for ingest into ES
:return:
"""
@task()
def get_start_date(start, end):
print(start, end)
end = int(end.timestamp())
if isinstance(start, pendulum.DateTime):
start = int(start.timestamp())
else:
start = end - CYCLE_START_SECONDS
return start, end
@task(execution_timeout=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS))
def run_query(start_end: tuple, query_template, conn_str, redis_key, transforms):
start, end = start_end
query = query_template.format(start=start, end=end)
return run_pipeline(query, conn_str, redis_key, transforms)
@task()
def store_period_end(start_end: tuple):
_ = Variable.set(DAG_NAME + "_period_end", start_end[1])
return
start = '{{ prev_data_interval_end_success }}'
end = '{{ dag_run.execution_date }}'
conn_str = get_source_url(SECRET)
start_end = get_start_date(start, end)
t1 = run_query(start_end, QUERY, conn_str, REDIS_KEY, TRANSFORMS)
t3 = store_period_end(start_end)
start_end >> t1 >> t3
dag = ETL_Workflow()
具体来说,我使用这些模板获得了所需的数据间隔:
start = '{{ prev_data_interval_end_success }}'
end = '{{ dag_run.execution_date }}'
但由于某些原因,这些值解析为相同的日期时间
[2021-12-04, 18:42:20 UTC] {logging_mixin.py:109} INFO - start: 2021-12-04T18:40:18.451905+00:00 end: 2021-12-04 18:40:18.451905+00:00
但是,您可以看到 运行 元数据中的数据间隔是正确的:
我很难过。 DAG 执行日期应在前一个 运行 的数据间隔结束后 CYCLE_START_SECONDS。我对 get_start_date 中的逻辑进行了单元测试,没问题。此外,某些工作流程不会遇到此问题。对于这些工作流,执行日期时间正确计算到上一个数据间隔结束后的 CYCLE_START_SECONDS。我是否错误地使用了模板?我是否错误地指定了时间表?任何关于可能是什么问题的指示将不胜感激。谢谢。
我认为您误会了 execution_date
(这很正常,因为这是一个非常令人困惑的概念)。 DAG 运行 的 execution_date
不是 运行 发生的时候,而是它应该处理的数据开始进来的时候。对于间隔调度的 DAG,execution_date
几乎总是等于 data_interval_start
,这又几乎总是等于它之前的 DAG 运行 的 data_interval_end
。这意味着 execution_date
是 之前的 运行 的 data_interval_end
,而不是之后的间隔。因此,如果前面的 运行 成功,您会看到 prev_data_interval_end_success
等于 execution_date
。完全正常。
鉴于您知道 prev_data_interval_end_success 的存在,您可能也知道 execution_date
已被弃用,这正是因为这个概念太混乱了。编写新的 DAG 时不要使用它;您可能正在寻找 data_interval_end
。
我正在解决无法在正确的数据间隔内将 DAG 设为 运行 的问题。
DAG 的形式为:
CYCLE_START_SECONDS = 240
CYCLE_END_SECONDS = 120
@dag(schedule_interval=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS), start_date=datetime(2021, 11, 16),
catchup=True, default_args=DEFAULT_ARGS,
on_failure_callback=emit_on_task_failure, max_active_runs=1, on_success_callback=emit_on_dag_success,
render_template_as_native_obj=True)
def ETL_Workflow():
"""
Workflow to post process raw data into metrics for ingest into ES
:return:
"""
@task()
def get_start_date(start, end):
print(start, end)
end = int(end.timestamp())
if isinstance(start, pendulum.DateTime):
start = int(start.timestamp())
else:
start = end - CYCLE_START_SECONDS
return start, end
@task(execution_timeout=timedelta(seconds=CYCLE_START_SECONDS - CYCLE_END_SECONDS))
def run_query(start_end: tuple, query_template, conn_str, redis_key, transforms):
start, end = start_end
query = query_template.format(start=start, end=end)
return run_pipeline(query, conn_str, redis_key, transforms)
@task()
def store_period_end(start_end: tuple):
_ = Variable.set(DAG_NAME + "_period_end", start_end[1])
return
start = '{{ prev_data_interval_end_success }}'
end = '{{ dag_run.execution_date }}'
conn_str = get_source_url(SECRET)
start_end = get_start_date(start, end)
t1 = run_query(start_end, QUERY, conn_str, REDIS_KEY, TRANSFORMS)
t3 = store_period_end(start_end)
start_end >> t1 >> t3
dag = ETL_Workflow()
具体来说,我使用这些模板获得了所需的数据间隔:
start = '{{ prev_data_interval_end_success }}'
end = '{{ dag_run.execution_date }}'
但由于某些原因,这些值解析为相同的日期时间
[2021-12-04, 18:42:20 UTC] {logging_mixin.py:109} INFO - start: 2021-12-04T18:40:18.451905+00:00 end: 2021-12-04 18:40:18.451905+00:00
但是,您可以看到 运行 元数据中的数据间隔是正确的:
我很难过。 DAG 执行日期应在前一个 运行 的数据间隔结束后 CYCLE_START_SECONDS。我对 get_start_date 中的逻辑进行了单元测试,没问题。此外,某些工作流程不会遇到此问题。对于这些工作流,执行日期时间正确计算到上一个数据间隔结束后的 CYCLE_START_SECONDS。我是否错误地使用了模板?我是否错误地指定了时间表?任何关于可能是什么问题的指示将不胜感激。谢谢。
我认为您误会了 execution_date
(这很正常,因为这是一个非常令人困惑的概念)。 DAG 运行 的 execution_date
不是 运行 发生的时候,而是它应该处理的数据开始进来的时候。对于间隔调度的 DAG,execution_date
几乎总是等于 data_interval_start
,这又几乎总是等于它之前的 DAG 运行 的 data_interval_end
。这意味着 execution_date
是 之前的 运行 的 data_interval_end
,而不是之后的间隔。因此,如果前面的 运行 成功,您会看到 prev_data_interval_end_success
等于 execution_date
。完全正常。
鉴于您知道 prev_data_interval_end_success 的存在,您可能也知道 execution_date
已被弃用,这正是因为这个概念太混乱了。编写新的 DAG 时不要使用它;您可能正在寻找 data_interval_end
。