运行 只有最新的 Airflow DAG
Run only the latest Airflow DAG
假设我想 运行 一个带有 Airflow 的非常简单的 ETL DAG:
它检查 DB2 中的最后插入时间,如果有的话,它会将较新的行从 DB1 加载到 DB2。
有一些可以理解的要求:
- 每小时排一次,前几运行秒会持续1个多小时
- 例如。第一个运行要处理一个月的数据,持续72小时,
- 所以第二个 运行 应该处理最后 72 小时,它持续 7.2 小时,
- 第三个处理 7.2 小时,并在一个小时内完成,
- 从那时起,每小时 运行 秒。
- 当 DAG 正在 运行ning 时,不要开始下一个,而是跳过它。
- 如果时间超过了触发事件,而 DAG 没有启动,则不要随后启动它。
- 还有其他的DAG,DAG应该独立执行。
我发现这些参数和运算符有点混乱,它们之间有什么区别?
depends_on_past
catchup
backfill
LatestOnlyOperator
我应该使用哪个,哪个LocalExecutor?
Ps。已经有一个非常相似的,但并不累人。
这个满足我的要求。 DAG 运行 每分钟,而我的 "main" 任务持续 90 秒,因此它应该每秒跳过 运行。
我已经使用 ShortCircuitOperator
检查当前 运行 是否是目前唯一的(在 airflow
db 的 dag_run
table 中查询),和 catchup=False
禁用回填。
但是我无法正确使用应该做类似事情的 LatestOnlyOperator
。
DAG 文件
import os
import sys
from datetime import datetime
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
import foo
import util
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2018, 2, 13), # or any date in the past
'email': ['services@mydomain.com'],
'email_on_failure': True}
dag = DAG(
'test90_dag',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)
condition_task = ShortCircuitOperator(
task_id='skip_check',
python_callable=util.is_latest_active_dagrun,
provide_context=True,
dag=dag)
py_task = PythonOperator(
task_id="test90_task",
python_callable=foo.bar,
provide_context=True,
dag=dag)
airflow.utils.helpers.chain(condition_task, py_task)
util.py
import logging
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook
def get_num_active_dagruns(dag_id, conn_id='airflow_db'):
# for this you have to set this value in the airflow db
airflow_db = PostgresHook(postgres_conn_id=conn_id)
conn = airflow_db.get_conn()
cursor = conn.cursor()
sql = "select count(*) from public.dag_run where dag_id = '{dag_id}' and state in ('running', 'queued', 'up_for_retry')".format(dag_id=dag_id)
cursor.execute(sql)
num_active_dagruns = cursor.fetchone()[0]
return num_active_dagruns
def is_latest_active_dagrun(**kwargs):
num_active_dagruns = get_num_active_dagruns(dag_id=kwargs['dag'].dag_id)
return (num_active_dagruns == 1)
foo.py
import datetime
import time
def bar(*args, **kwargs):
t = datetime.datetime.now()
execution_date = str(kwargs['execution_date'])
with open("/home/airflow/test.log", "a") as myfile:
myfile.write(execution_date + ' - ' + str(t) + '\n')
time.sleep(90)
with open("/home/airflow/test.log", "a") as myfile:
myfile.write(execution_date + ' - ' + str(t) + ' +90\n')
return 'bar: ok'
致谢:此答案基于this blog post。
DAG max_active_runs = 1 结合 catchup = False 可以解决这个问题。
DAG max_active_runs = 1 与 catchup = False 相结合,并在 wait_for_downstream=True 的开头添加一个 DUMMY 任务(类似于 START 任务)。
从 LatestOnlyOperator 开始 - 如果先前的执行尚未完成,它将有助于避免重新运行任务。
或者将 "START" 任务创建为 LatestOnlyOperator 并确保第一个处理层的所有 Taks 部分都连接到它。但请注意 - 根据文档 "Note that downstream tasks are never skipped if the given DAG_Run is marked as externally triggered."
假设我想 运行 一个带有 Airflow 的非常简单的 ETL DAG: 它检查 DB2 中的最后插入时间,如果有的话,它会将较新的行从 DB1 加载到 DB2。
有一些可以理解的要求:
- 每小时排一次,前几运行秒会持续1个多小时
- 例如。第一个运行要处理一个月的数据,持续72小时,
- 所以第二个 运行 应该处理最后 72 小时,它持续 7.2 小时,
- 第三个处理 7.2 小时,并在一个小时内完成,
- 从那时起,每小时 运行 秒。
- 当 DAG 正在 运行ning 时,不要开始下一个,而是跳过它。
- 如果时间超过了触发事件,而 DAG 没有启动,则不要随后启动它。
- 还有其他的DAG,DAG应该独立执行。
我发现这些参数和运算符有点混乱,它们之间有什么区别?
depends_on_past
catchup
backfill
LatestOnlyOperator
我应该使用哪个,哪个LocalExecutor?
Ps。已经有一个非常相似的
这个满足我的要求。 DAG 运行 每分钟,而我的 "main" 任务持续 90 秒,因此它应该每秒跳过 运行。
我已经使用 ShortCircuitOperator
检查当前 运行 是否是目前唯一的(在 airflow
db 的 dag_run
table 中查询),和 catchup=False
禁用回填。
但是我无法正确使用应该做类似事情的 LatestOnlyOperator
。
DAG 文件
import os
import sys
from datetime import datetime
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
import foo
import util
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2018, 2, 13), # or any date in the past
'email': ['services@mydomain.com'],
'email_on_failure': True}
dag = DAG(
'test90_dag',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)
condition_task = ShortCircuitOperator(
task_id='skip_check',
python_callable=util.is_latest_active_dagrun,
provide_context=True,
dag=dag)
py_task = PythonOperator(
task_id="test90_task",
python_callable=foo.bar,
provide_context=True,
dag=dag)
airflow.utils.helpers.chain(condition_task, py_task)
util.py
import logging
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook
def get_num_active_dagruns(dag_id, conn_id='airflow_db'):
# for this you have to set this value in the airflow db
airflow_db = PostgresHook(postgres_conn_id=conn_id)
conn = airflow_db.get_conn()
cursor = conn.cursor()
sql = "select count(*) from public.dag_run where dag_id = '{dag_id}' and state in ('running', 'queued', 'up_for_retry')".format(dag_id=dag_id)
cursor.execute(sql)
num_active_dagruns = cursor.fetchone()[0]
return num_active_dagruns
def is_latest_active_dagrun(**kwargs):
num_active_dagruns = get_num_active_dagruns(dag_id=kwargs['dag'].dag_id)
return (num_active_dagruns == 1)
foo.py
import datetime
import time
def bar(*args, **kwargs):
t = datetime.datetime.now()
execution_date = str(kwargs['execution_date'])
with open("/home/airflow/test.log", "a") as myfile:
myfile.write(execution_date + ' - ' + str(t) + '\n')
time.sleep(90)
with open("/home/airflow/test.log", "a") as myfile:
myfile.write(execution_date + ' - ' + str(t) + ' +90\n')
return 'bar: ok'
致谢:此答案基于this blog post。
DAG max_active_runs = 1 结合 catchup = False 可以解决这个问题。
DAG max_active_runs = 1 与 catchup = False 相结合,并在 wait_for_downstream=True 的开头添加一个 DUMMY 任务(类似于 START 任务)。 从 LatestOnlyOperator 开始 - 如果先前的执行尚未完成,它将有助于避免重新运行任务。 或者将 "START" 任务创建为 LatestOnlyOperator 并确保第一个处理层的所有 Taks 部分都连接到它。但请注意 - 根据文档 "Note that downstream tasks are never skipped if the given DAG_Run is marked as externally triggered."