气流回填澄清
Airflow backfill clarification
我刚刚开始使用 Airbnb 的 airflow,我还不清楚 how/when 回填是否完成。
具体来说,有 2 个用例让我感到困惑:
如果我运行airflow scheduler
几分钟,停一分钟,然后再重启,我的DAG好像运行额外任务前 30 秒左右,然后它继续正常(每 10 秒 运行s)。这些额外的任务 "backfilled" 是之前 运行 无法完成的任务吗?如果是这样,我如何告诉气流不要回填这些任务?
如果我运行airflow scheduler
几分钟,然后运行airflow clear MY_tutorial
,然后重启airflow scheduler
,好像运行 一大堆额外的任务。这些任务也是 "backfilled" 任务吗?还是我漏了什么。
目前,我有一个非常简单的dag:
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2016, 10, 4),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 8)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
second_template = """
touch ~/airflow/logs/test
echo $(date) >> ~/airflow/logs/test
"""
t4 = BashOperator(
task_id='write_test',
bash_command=second_template,
dag=dag)
t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)
我在气流配置中更改的唯一两件事是
- 我从使用 sqlite 数据库改为使用 postgres 数据库
- 我正在使用
CeleryExecutor
而不是 SequentialExecutor
非常感谢您的帮助!
当您将 DAG 的调度程序开关更改为“打开”时,调度程序将触发所有未记录状态的 dag 运行 实例的回填,从 start_date 开始您在“default_args”中指定。
例如:如果开始日期是“2017-01-21”并且您在“2017-01-22T00:00:00”打开了计划开关并且您的 dag 配置为 运行每小时,然后调度程序将回填 24 dag 运行s,然后在计划的时间间隔开始 运行ning。
这基本上就是你的两个问题中发生的事情。在 #1 中,它正在填充从您关闭调度程序的 30 秒内丢失的 3 个 运行s。在 #2 中,它填充了从 start_date 到“现在”的所有 DAG 运行。
有两种解决方法:
- 将 start_date 设置为将来的日期,这样它只会在到达该日期后才开始安排 dag 运行。请注意,如果您更改 DAG 的 start_date,由于开始日期存储在气流数据库中的方式,您还必须更改 DAG 的名称。
- 手动 运行 回填 from the command line 使用“-m” (--mark-success) 标志告诉气流实际上不要 运行 DAG,而只是标记它在数据库中是成功的。
例如
airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
Airflow 的 UI 上的 On/Off 仅声明 "PAUSE" 这意味着,如果它打开,它只会在它被触发的时间暂停,如果它再次在那个日期继续被关闭。
请注意,从 1.8 版开始,Airflow 允许您使用 catchup 控制此行为。在 airflow.cfg 中设置 catchup_by_default=False
或
catchup=False
在您的 DAG 定义中。
见https://airflow.apache.org/scheduler.html#backfill-and-catchup
我刚刚开始使用 Airbnb 的 airflow,我还不清楚 how/when 回填是否完成。
具体来说,有 2 个用例让我感到困惑:
如果我运行
airflow scheduler
几分钟,停一分钟,然后再重启,我的DAG好像运行额外任务前 30 秒左右,然后它继续正常(每 10 秒 运行s)。这些额外的任务 "backfilled" 是之前 运行 无法完成的任务吗?如果是这样,我如何告诉气流不要回填这些任务?如果我运行
airflow scheduler
几分钟,然后运行airflow clear MY_tutorial
,然后重启airflow scheduler
,好像运行 一大堆额外的任务。这些任务也是 "backfilled" 任务吗?还是我漏了什么。
目前,我有一个非常简单的dag:
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2016, 10, 4),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 8)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
second_template = """
touch ~/airflow/logs/test
echo $(date) >> ~/airflow/logs/test
"""
t4 = BashOperator(
task_id='write_test',
bash_command=second_template,
dag=dag)
t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)
我在气流配置中更改的唯一两件事是
- 我从使用 sqlite 数据库改为使用 postgres 数据库
- 我正在使用
CeleryExecutor
而不是SequentialExecutor
非常感谢您的帮助!
当您将 DAG 的调度程序开关更改为“打开”时,调度程序将触发所有未记录状态的 dag 运行 实例的回填,从 start_date 开始您在“default_args”中指定。
例如:如果开始日期是“2017-01-21”并且您在“2017-01-22T00:00:00”打开了计划开关并且您的 dag 配置为 运行每小时,然后调度程序将回填 24 dag 运行s,然后在计划的时间间隔开始 运行ning。
这基本上就是你的两个问题中发生的事情。在 #1 中,它正在填充从您关闭调度程序的 30 秒内丢失的 3 个 运行s。在 #2 中,它填充了从 start_date 到“现在”的所有 DAG 运行。
有两种解决方法:
- 将 start_date 设置为将来的日期,这样它只会在到达该日期后才开始安排 dag 运行。请注意,如果您更改 DAG 的 start_date,由于开始日期存储在气流数据库中的方式,您还必须更改 DAG 的名称。
- 手动 运行 回填 from the command line 使用“-m” (--mark-success) 标志告诉气流实际上不要 运行 DAG,而只是标记它在数据库中是成功的。
例如
airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
Airflow 的 UI 上的 On/Off 仅声明 "PAUSE" 这意味着,如果它打开,它只会在它被触发的时间暂停,如果它再次在那个日期继续被关闭。
请注意,从 1.8 版开始,Airflow 允许您使用 catchup 控制此行为。在 airflow.cfg 中设置 catchup_by_default=False
或
catchup=False
在您的 DAG 定义中。
见https://airflow.apache.org/scheduler.html#backfill-and-catchup