如何 运行 一个简单的 Airflow DAG

How to Run a Simple Airflow DAG

我对 Airflow 完全陌生。我想在指定日期 运行 一个简单的 DAG。我正在努力区分开始日期、执行日期和回填日期。 运行 DAG 的命令是什么?

这是我从那以后尝试过的方法:

airflow run dag_1 task_1 2017-1-23

我第一次 运行 该命令时,任务执行正确,但当我再次尝试时它没有工作。

这是我的另一个命令 运行:

airflow backfill dag_1 -s 2017-1-23 -e 2017-1-24

我不知道该命令会带来什么。 DAG 会在每天 23 点到 24 点执行吗?

在运行执行上面的两个命令之前,我这样做了:

airflow initdb
airflow scheduler 
airflow webserver -p 8085 --debug &

这是我的 DAG

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 1, 23, 12),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag_1', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='create_clients',
    bash_command='Rscript /scripts/Cli.r',
    dag=dag)

t2 = BashOperator(
    task_id='create_operation',
    bash_command='Rscript Operation.r',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

截图:Tree View

更新

airflow run dag_1 task_1 2017-1-23T10:34

如果你 运行 它曾经与

airflow run dag_1 task_1 2017-1-23

运行 已保存,再次 运行 将不会执行任何操作,您可以尝试通过强制 re-run

airflow run --force=true dag_1 task_1 2017-1-23

气流回填命令将 运行 在从开始日期到结束日期指定的时间段内 运行 的任何执行。这将取决于你在 DAG 上设置的时间表,如果你将它设置为每小时触发它应该 运行 24 次,但它也不会 re-execute 之前执行 运行s.

您可以像从不清除任务一样清除该任务运行

airflow clear dag_1 -s 2017-1-23 -e 2017-1-24

另请查看此处的 cli 文档:https://airflow.incubator.apache.org/cli.html

difference between the start date ,the execution date and backfilling

回填 运行 DAG 明确地 test/manually 运行 DAG/re 运行 error-ed 出的 DAG。您使用 CLI

执行此操作
airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately

start_date,顾名思义,就是DAG定义生效的日期

execution_date当运行时就是date-time。这是您在测试 DAG 的各个任务时提供的,如下所示

airflow test <<dag>> <<task>> <<exec_date>>

what is the command to run the dag

Backfill 是 运行 DAG 的明确命令。否则,您只需将 DAG 放入 DAGBAG 文件夹中,调度程序将 运行 它按照 DAG 定义中定义的时间表

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately