没有日期的气流示例
Airflow examples without dates
我刚开始使用 airflow。我正在尝试 运行 一个 dag,不想做任何安排。
我想 运行 带有命令行参数的管道并覆盖所有当前输出。我没有开始日期,没有日程安排,没有时间安排,也没有重试逻辑,我只想运行一组功能按顺序开始。
文档始终包含日期。
airflow test tutorial print_date 2015-06-01
我想 运行 dag 以便它执行所有功能并忽略任何以前的 运行。如何从我的 dag 中删除所有日期和日期逻辑?
我有修改版的教程dag文件:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import os
import cPickle
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@once'
}
dag = DAG('tutorial_me', default_args=default_args)
def save_file(filenm):
with open(filenm, 'wb') as pickle_file:
cPickle.dump(['1','2',3], pickle_file)
def delete_file(filenm):
print "************ THIS IS WHERE STDOUT GOES"
if os.path.exists(filenm):
os.path.remove(filenm)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = PythonOperator(
task_id='save_file',
python_callable=save_file,
op_kwargs=dict(filenm='__myparamfile__.txt'),
dag=dag)
t2 = PythonOperator(
task_id='remove_file',
python_callable=delete_file,
op_kwargs=dict(filenm='__myparamfile__.txt'),
dag=dag)
t1.set_upstream(t2)
我第一次 运行 它与:
airflow run tutorial_me remove_file 2015-01-04
它可以工作并打印 print "************ THIS IS WHERE STDOUT GOES"
行。第二次我运行它,它没有。
第二个 运行
之后的日志文件看起来像这样
cat 2015-01-04T00\:00\:00
[2016-12-10 11:27:47,158] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags
[2016-12-10 11:27:47,214] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:47,214] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:47,227] {base_executor.py:36} INFO - Adding to queue: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:47,234] {sequential_executor.py:26} INFO - Executing command: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:48,050] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:48,101] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:48,102] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:48,942] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:48,998] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:48,998] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:49,020] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2016-12-10 11:27:49,046] {models.py:1219} INFO - Executing <Task(PythonOperator): remove_file> on 2015-01-04 00:00:00
[2016-12-10 11:27:49,054] {python_operator.py:67} INFO - Done. Returned value was: None
[2016-12-10 11:27:55,168] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags
[2016-12-10 11:27:55,219] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:55,220] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:55,231] {base_executor.py:36} INFO - Adding to queue: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:55,236] {sequential_executor.py:26} INFO - Executing command: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:56,030] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:56,082] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,082] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,899] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:56,950] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,951] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,967] {models.py:1150} INFO -
Airflow 旨在维护其 DAG 运行s 的历史记录,因此它可以按顺序处理批量数据并确保每个任务 运行s 恰好为其 DagRun 一次。
对于您尝试做的最简单的事情可能是忽略调度程序并从外部触发执行日期为 "now" 的 DagRun,包括完整的日期和时间。这确保您调用的每个 运行 都只执行一次所有任务,并且任务的每个 运行 独立于任何先前的 运行。您将需要 depends_on_past = False 并且您可能还需要 max_active_runs 是一个非常大的值,因为任何失败的 DagRuns 将保留 "active" 但您不希望它们干扰新的调用.
我认为您的要求与 airflow issue #198 类似:
"For cases where we need to only run the latest in a series of task instance runs and mark the others as skipped. For example, we may have job to execute a DB snapshot every day. If the DAG is paused for 5 days and then unpaused, we don’t want to run all 5, just the latest. With this feature, we will provide “cron” functionality for task scheduling that is not related to ETL"
此问题已通过 LatestOnlyOperator
功能解决,记录在案 here。
https://airflow.apache.org/concepts.html#latest-run-only
的官方文档中描述了用法
from airflow.operators.latest_only_operator import LatestOnlyOperator
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
我刚开始使用 airflow。我正在尝试 运行 一个 dag,不想做任何安排。
我想 运行 带有命令行参数的管道并覆盖所有当前输出。我没有开始日期,没有日程安排,没有时间安排,也没有重试逻辑,我只想运行一组功能按顺序开始。
文档始终包含日期。
airflow test tutorial print_date 2015-06-01
我想 运行 dag 以便它执行所有功能并忽略任何以前的 运行。如何从我的 dag 中删除所有日期和日期逻辑?
我有修改版的教程dag文件:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import os
import cPickle
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@once'
}
dag = DAG('tutorial_me', default_args=default_args)
def save_file(filenm):
with open(filenm, 'wb') as pickle_file:
cPickle.dump(['1','2',3], pickle_file)
def delete_file(filenm):
print "************ THIS IS WHERE STDOUT GOES"
if os.path.exists(filenm):
os.path.remove(filenm)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = PythonOperator(
task_id='save_file',
python_callable=save_file,
op_kwargs=dict(filenm='__myparamfile__.txt'),
dag=dag)
t2 = PythonOperator(
task_id='remove_file',
python_callable=delete_file,
op_kwargs=dict(filenm='__myparamfile__.txt'),
dag=dag)
t1.set_upstream(t2)
我第一次 运行 它与:
airflow run tutorial_me remove_file 2015-01-04
它可以工作并打印 print "************ THIS IS WHERE STDOUT GOES"
行。第二次我运行它,它没有。
第二个 运行
之后的日志文件看起来像这样cat 2015-01-04T00\:00\:00
[2016-12-10 11:27:47,158] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags
[2016-12-10 11:27:47,214] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:47,214] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:47,227] {base_executor.py:36} INFO - Adding to queue: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:47,234] {sequential_executor.py:26} INFO - Executing command: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:48,050] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:48,101] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:48,102] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:48,942] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:48,998] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:48,998] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:49,020] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2016-12-10 11:27:49,046] {models.py:1219} INFO - Executing <Task(PythonOperator): remove_file> on 2015-01-04 00:00:00
[2016-12-10 11:27:49,054] {python_operator.py:67} INFO - Done. Returned value was: None
[2016-12-10 11:27:55,168] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags
[2016-12-10 11:27:55,219] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:55,220] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:55,231] {base_executor.py:36} INFO - Adding to queue: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:55,236] {sequential_executor.py:26} INFO - Executing command: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py
[2016-12-10 11:27:56,030] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:56,082] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,082] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,899] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py
[2016-12-10 11:27:56,950] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,951] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2016-12-10 11:27:56,967] {models.py:1150} INFO -
Airflow 旨在维护其 DAG 运行s 的历史记录,因此它可以按顺序处理批量数据并确保每个任务 运行s 恰好为其 DagRun 一次。
对于您尝试做的最简单的事情可能是忽略调度程序并从外部触发执行日期为 "now" 的 DagRun,包括完整的日期和时间。这确保您调用的每个 运行 都只执行一次所有任务,并且任务的每个 运行 独立于任何先前的 运行。您将需要 depends_on_past = False 并且您可能还需要 max_active_runs 是一个非常大的值,因为任何失败的 DagRuns 将保留 "active" 但您不希望它们干扰新的调用.
我认为您的要求与 airflow issue #198 类似:
"For cases where we need to only run the latest in a series of task instance runs and mark the others as skipped. For example, we may have job to execute a DB snapshot every day. If the DAG is paused for 5 days and then unpaused, we don’t want to run all 5, just the latest. With this feature, we will provide “cron” functionality for task scheduling that is not related to ETL"
此问题已通过 LatestOnlyOperator
功能解决,记录在案 here。
https://airflow.apache.org/concepts.html#latest-run-only
的官方文档中描述了用法from airflow.operators.latest_only_operator import LatestOnlyOperator
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)