Airflow:每天重新执行过去 n 天的 DAG 作业
Airflow: re execute the jobs of a DAG for the past n days on a daily basis
我已安排每天 运行 执行 DAG。
完美运行一天。
然而,每天我都想不仅为当天 {{ ds }} 重新执行,而且还为前 n 天(假设 n = 7)重新执行。
例如,在计划于“2018-01-30”运行 的下一次执行中,我希望 Airflow 不仅 运行 DAG 用作执行日期“2018-01- 30”,而且还重新运行 前几天从“2018-01-23”到“2018-01-30”的 DAG。
是否有一种简单的方法可以 "invalidate" 之前的执行,以便自动 运行 回填?
您是否考虑过让 运行 一天一次的狗只是 运行 您过去 7 天的任务?我想你将只有 7 个任务,每个任务都会产生一个 SubDAG,与你的执行日期有不同的日期偏移。
我认为这将使调试更容易,历史记录更清晰。我相信尝试回填已经执行的任务将涉及删除任务实例或将它们的状态全部设置为 NONE。然后你仍然需要在那些 dag 运行s 上触发回填。当事情失败并且看起来有点混乱时,将更难跟踪。
您可以在循环中动态生成任务并将偏移量传递给您的操作员。
这里是 Python 的例子。
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}
def check_trigger(execution_date, day_offset, **kwargs):
target_date = execution_date - timedelta(days=day_offset)
# use target_date
for day_offset in xrange(1, 8):
PythonOperator(
task_id='task_offset_' + i,
python_callable=check_trigger,
provide_context=True,
dag=dag,
op_kwargs={'day_offset' : day_offset}
)
我已安排每天 运行 执行 DAG。 完美运行一天。
然而,每天我都想不仅为当天 {{ ds }} 重新执行,而且还为前 n 天(假设 n = 7)重新执行。
例如,在计划于“2018-01-30”运行 的下一次执行中,我希望 Airflow 不仅 运行 DAG 用作执行日期“2018-01- 30”,而且还重新运行 前几天从“2018-01-23”到“2018-01-30”的 DAG。
是否有一种简单的方法可以 "invalidate" 之前的执行,以便自动 运行 回填?
您是否考虑过让 运行 一天一次的狗只是 运行 您过去 7 天的任务?我想你将只有 7 个任务,每个任务都会产生一个 SubDAG,与你的执行日期有不同的日期偏移。
我认为这将使调试更容易,历史记录更清晰。我相信尝试回填已经执行的任务将涉及删除任务实例或将它们的状态全部设置为 NONE。然后你仍然需要在那些 dag 运行s 上触发回填。当事情失败并且看起来有点混乱时,将更难跟踪。
您可以在循环中动态生成任务并将偏移量传递给您的操作员。
这里是 Python 的例子。
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}
def check_trigger(execution_date, day_offset, **kwargs):
target_date = execution_date - timedelta(days=day_offset)
# use target_date
for day_offset in xrange(1, 8):
PythonOperator(
task_id='task_offset_' + i,
python_callable=check_trigger,
provide_context=True,
dag=dag,
op_kwargs={'day_offset' : day_offset}
)