如何在两个不同的 schedule_intervals 中将两个 DAG 安排到 运行,但在第一个完成后,第二个仅 运行

How to schedule two DAGs to run in two different schedule_intervals but the second only run after the first has finished

我有两个不同的 DAG,需要以不同的频率 运行。一个,即 dag1 需要每周 运行,而 dag2 需要每天 运行。现在 dag2 应该只 运行 当 dag1 完成时,在每次出现 dag1 运行s.

我在两个不同的 python 模块中定义了两个 DAG,如下所示。

dag1.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag1',
     default_args={
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='00 10 * * 1',
     catchup=True
    ) as dag:

CRAWL_PARAMS = BashOperator(
    task_id='crawl_params',
    bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)

dag2.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag2',
     default_args = {
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='5 10 * * *',
     catchup=True
    ) as dag:

CRAWL_DATASET = BashOperator(
    task_id='crawl_dataset',
    bash_command='''
        cd {}/scraper && scrapy crawl crawl_dataset
    '''.format(PROJECT_PATH)
)

目前我手动设置了两个dag之间的间隔为5分钟。此设置当前无法正常工作,也缺少根据需要使 dag2 依赖于 dag1 的功能。

我已经检查了答案 and 但没弄明白。

注意:schedule_intervals 仅供参考。目的是 运行 dag1 每个星期一的固定时间和 运行 dag2 每天的固定时间和星期一,它应该只在 dag1 完成后. 这里每个 dag 也有多个任务。

  1. 最简单的解决方案是用一个 ExternalTaskSensor 开始你的第二个 DAG,等待你的第一个 DAG
  2. 的最后一个任务完成
  3. 或者,您也可以使用 TriggerDagRunOperator 在第一个 dag 的末尾触发第二个 dag。但是,在这种情况下,您将无法将 schedule_interval 分配给第二个 dag(因为它将 'forcefully' 由第一个 dag 触发)

您可以将这两个任务写在同一个 DAG 中,并有一个下游来设置任务依赖关系

task1.set_downstream(task2)

至于不同的任务时间表依赖,创建具有每日时间表的DAG。对于每周计划的任务,编写一个 shortCircuitOperator 以启用每周触发器:

# Set trigger for first day of the week
def check_trigger_week(execution_date, **kwargs):
    return execution_date.weekday() == 0

# Task should check for the trigger to see if its first day of the week
check_trigger_weekly = ShortCircuitOperator(
  task_id='check_trigger_weekly',
  python_callable=check_trigger_week,
  provide_context=True,
  dag=dag
)

然后让你的每周任务依赖于这个每周触发器

check_trigger_weekly.set_downstream(task)

在理解流程方面进行了很多努力之后,我终于自己想出了答案(不确定它有多优化,但目前对我有用)。感谢 answer and branching docs。 这是我使用 BranchPythonOperator.

的解决方案

dag1.py

import datetime as dt
from os import path

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'start_date': dt.datetime(2019, 8, 20),
    'concurrency': 1,
    'retries': 0
}

def branch_tasks(execution_date, **kwargs):
    '''
    Branch the tasks based on weekday.
    '''
    # check if the execution day is 'Saturday'
    if execution_date.weekday() == 5:
        return ['crawl_params', 'crawl_dataset']

    return 'crawl_dataset'

with DAG('dag1',
         default_args=DEFAULT_ARGS,
         schedule_interval='00 10 * * *',
         catchup=False
        ) as dag:

    CRAWL_PARAMS = BashOperator(
        task_id='crawl_params',
        bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
    )

    CRAWL_DATASET = BashOperator(
        task_id='crawl_dataset',
        bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
        trigger_rule='none_failed'
    )

BRANCH_OP = BranchPythonOperator(
    task_id='branch_tasks',
    provide_context=True,
    python_callable=branch_tasks,
    dag=dag
)

BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)

在这里,BranchPythonOperator 使用 branch_tasks 函数根据星期几选择要 运行 的任务。
这里的另一个问题是当 crawl_params 做 运行 当条件为真时,下游也将 运行 但当它被跳过时,它的下游也将被跳过。为了避免这种情况,我们需要将 trigger_rule='none_failed' 传递给任务的操作员。这意味着如果 none 的上游任务失败(它们要么成功要么被跳过),任务应该 运行。