是否可以在 Airflow 中使用不依赖于任何时间表的管道?

Is it possible to have a pipeline in Airflow that does not tie to any schedule?

我需要一个可以手动或以编程方式执行的管道,Airflow 可以吗?看起来现在每个工作流程都必须绑定到一个时间表。

只需在创建 DAG 时将 schedule_interval 设置为 None

dag = DAG('workflow_name',
          template_searchpath='path',
          schedule_interval=None,
          default_args=default_args)

来自Airflow Manual

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object.

手册接着列出了一些 cron 'presets',其中之一是 None

在 Airflow 中,每个 DAG 都需要有一个开始日期和计划间隔*,例如每小时:

import datetime

dag = DAG(
    dag_id='my_dag',
    schedule_interval=datetime.timedelta(hours=1),
    start_date=datetime(2018, 5, 23),
)

(没有时间表它怎么知道什么时候运行?)

作为 cron 计划的替代方案,您可以将计划 @once 设置为仅 运行 一次。

*一个例外:您可以省略 externally triggered DAGs 的时间表,因为 Airflow 不会自行安排它们。

但是,就是说,如果您省略了时间表,那么您需要以某种方式从外部触发 DAG。如果您希望能够以编程方式调用 DAG,例如,作为在另一个 DAG 中发生的单独条件的结果,您可以使用 TriggerDagRunOperator 来实现。您可能还听说过这种称为外部触发 DAG 的想法。

以下是 Airflow 示例 DAG 中的一个用法示例:

文件 1 - example_trigger_controller_dag.py:

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
  a. A python callable that decides whether or not to trigger the Target DAG
  b. An optional params dict passed to the python callable to help in
     evaluating whether or not to trigger the Target DAG
  c. The id (name) of the Target DAG
  d. The python callable can add contextual info to the DagRun created by
     way of adding a Pickleable payload (e.g. dictionary of primitives). This
     state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj


# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
          default_args={"owner": "airflow",
                        "start_date": datetime.utcnow()},
          schedule_interval='@once')


# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="example_trigger_target_dag",
                                python_callable=conditionally_trigger,
                                params={'condition_param': True,
                                        'message': 'Hello World'},
                                dag=dag)

文件 2 - example_trigger_target_dag.py:

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime

import pprint
pp = pprint.PrettyPrinter(indent=4)

# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#    (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#   a. A python callable that decides whether or not to trigger the Target DAG
#   b. An optional params dict passed to the python callable to help in
#      evaluating whether or not to trigger the Target DAG
#   c. The id (name) of the Target DAG
#   d. The python callable can add contextual info to the DagRun created by
#      way of adding a Pickleable payload (e.g. dictionary of primitives). This
#      state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_trigger_target_dag',
    default_args=args,
    schedule_interval=None)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)


# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag)

是的,这可以通过在default_args中将None传递给schedule_interval来实现。

查看 DAG 运行 上的 this 文档。

例如:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 12, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': None, # Check this line 
}