如何在两个不同的 schedule_intervals 中将两个 DAG 安排到 运行,但在第一个完成后,第二个仅 运行
How to schedule two DAGs to run in two different schedule_intervals but the second only run after the first has finished
我有两个不同的 DAG,需要以不同的频率 运行。一个,即 dag1
需要每周 运行,而 dag2
需要每天 运行。现在 dag2
应该只 运行 当 dag1 完成时,在每次出现 dag1
运行s.
时
我在两个不同的 python 模块中定义了两个 DAG,如下所示。
dag1.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag1',
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='00 10 * * 1',
catchup=True
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
dag2.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag2',
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='5 10 * * *',
catchup=True
) as dag:
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='''
cd {}/scraper && scrapy crawl crawl_dataset
'''.format(PROJECT_PATH)
)
目前我手动设置了两个dag之间的间隔为5分钟。此设置当前无法正常工作,也缺少根据需要使 dag2
依赖于 dag1
的功能。
我已经检查了答案 and 但没弄明白。
注意:schedule_intervals
仅供参考。目的是 运行 dag1
每个星期一的固定时间和 运行 dag2
每天的固定时间和星期一,它应该只在 dag1
完成后.
这里每个 dag 也有多个任务。
- 最简单的解决方案是用一个
ExternalTaskSensor
开始你的第二个 DAG,等待你的第一个 DAG 的最后一个任务完成
- 或者,您也可以使用
TriggerDagRunOperator
在第一个 dag 的末尾触发第二个 dag。但是,在这种情况下,您将无法将 schedule_interval
分配给第二个 dag(因为它将 'forcefully' 由第一个 dag 触发)
您可以将这两个任务写在同一个 DAG 中,并有一个下游来设置任务依赖关系
task1.set_downstream(task2)
至于不同的任务时间表依赖,创建具有每日时间表的DAG。对于每周计划的任务,编写一个 shortCircuitOperator 以启用每周触发器:
# Set trigger for first day of the week
def check_trigger_week(execution_date, **kwargs):
return execution_date.weekday() == 0
# Task should check for the trigger to see if its first day of the week
check_trigger_weekly = ShortCircuitOperator(
task_id='check_trigger_weekly',
python_callable=check_trigger_week,
provide_context=True,
dag=dag
)
然后让你的每周任务依赖于这个每周触发器
check_trigger_weekly.set_downstream(task)
在理解流程方面进行了很多努力之后,我终于自己想出了答案(不确定它有多优化,但目前对我有用)。感谢 answer and branching docs。
这是我使用 BranchPythonOperator.
的解决方案
dag1.py
import datetime as dt
from os import path
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 20),
'concurrency': 1,
'retries': 0
}
def branch_tasks(execution_date, **kwargs):
'''
Branch the tasks based on weekday.
'''
# check if the execution day is 'Saturday'
if execution_date.weekday() == 5:
return ['crawl_params', 'crawl_dataset']
return 'crawl_dataset'
with DAG('dag1',
default_args=DEFAULT_ARGS,
schedule_interval='00 10 * * *',
catchup=False
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
trigger_rule='none_failed'
)
BRANCH_OP = BranchPythonOperator(
task_id='branch_tasks',
provide_context=True,
python_callable=branch_tasks,
dag=dag
)
BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)
在这里,BranchPythonOperator 使用 branch_tasks 函数根据星期几选择要 运行 的任务。
这里的另一个问题是当 crawl_params
做 运行 当条件为真时,下游也将 运行 但当它被跳过时,它的下游也将被跳过。为了避免这种情况,我们需要将 trigger_rule='none_failed'
传递给任务的操作员。这意味着如果 none 的上游任务失败(它们要么成功要么被跳过),任务应该 运行。
我有两个不同的 DAG,需要以不同的频率 运行。一个,即 dag1
需要每周 运行,而 dag2
需要每天 运行。现在 dag2
应该只 运行 当 dag1 完成时,在每次出现 dag1
运行s.
我在两个不同的 python 模块中定义了两个 DAG,如下所示。
dag1.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag1',
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='00 10 * * 1',
catchup=True
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
dag2.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag2',
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='5 10 * * *',
catchup=True
) as dag:
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='''
cd {}/scraper && scrapy crawl crawl_dataset
'''.format(PROJECT_PATH)
)
目前我手动设置了两个dag之间的间隔为5分钟。此设置当前无法正常工作,也缺少根据需要使 dag2
依赖于 dag1
的功能。
我已经检查了答案
注意:schedule_intervals
仅供参考。目的是 运行 dag1
每个星期一的固定时间和 运行 dag2
每天的固定时间和星期一,它应该只在 dag1
完成后.
这里每个 dag 也有多个任务。
- 最简单的解决方案是用一个
ExternalTaskSensor
开始你的第二个 DAG,等待你的第一个 DAG 的最后一个任务完成
- 或者,您也可以使用
TriggerDagRunOperator
在第一个 dag 的末尾触发第二个 dag。但是,在这种情况下,您将无法将schedule_interval
分配给第二个 dag(因为它将 'forcefully' 由第一个 dag 触发)
您可以将这两个任务写在同一个 DAG 中,并有一个下游来设置任务依赖关系
task1.set_downstream(task2)
至于不同的任务时间表依赖,创建具有每日时间表的DAG。对于每周计划的任务,编写一个 shortCircuitOperator 以启用每周触发器:
# Set trigger for first day of the week
def check_trigger_week(execution_date, **kwargs):
return execution_date.weekday() == 0
# Task should check for the trigger to see if its first day of the week
check_trigger_weekly = ShortCircuitOperator(
task_id='check_trigger_weekly',
python_callable=check_trigger_week,
provide_context=True,
dag=dag
)
然后让你的每周任务依赖于这个每周触发器
check_trigger_weekly.set_downstream(task)
在理解流程方面进行了很多努力之后,我终于自己想出了答案(不确定它有多优化,但目前对我有用)。感谢
dag1.py
import datetime as dt
from os import path
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 20),
'concurrency': 1,
'retries': 0
}
def branch_tasks(execution_date, **kwargs):
'''
Branch the tasks based on weekday.
'''
# check if the execution day is 'Saturday'
if execution_date.weekday() == 5:
return ['crawl_params', 'crawl_dataset']
return 'crawl_dataset'
with DAG('dag1',
default_args=DEFAULT_ARGS,
schedule_interval='00 10 * * *',
catchup=False
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
trigger_rule='none_failed'
)
BRANCH_OP = BranchPythonOperator(
task_id='branch_tasks',
provide_context=True,
python_callable=branch_tasks,
dag=dag
)
BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)
在这里,BranchPythonOperator 使用 branch_tasks 函数根据星期几选择要 运行 的任务。
这里的另一个问题是当 crawl_params
做 运行 当条件为真时,下游也将 运行 但当它被跳过时,它的下游也将被跳过。为了避免这种情况,我们需要将 trigger_rule='none_failed'
传递给任务的操作员。这意味着如果 none 的上游任务失败(它们要么成功要么被跳过),任务应该 运行。