Airflow DAG 运行 每秒而不是每分钟
Airflow DAG Running Every Second Rather Than Every Minute
我正在尝试将我的 DAG 安排到每分钟 运行,但它似乎是每秒 运行ning。根据我读过的所有内容,我只需要在我的 DAG 中包含 schedule_interval='*/1 * * * *', #..every 1 minute
就可以了,但它不起作用。这里我设置了一个简单的例子来测试它:
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 6, 4),
'schedule_interval': '*/1 * * * *', #..every 1 minute
'email': ['airflow@airflow.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
max_active_runs=3,
schedule_interval='*/1 * * * *', #..every 1 minute
default_args=default_args,
)
test= BashOperator(
task_id='test',
bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
retries=1,
dag=dag)
更新:
在与@Taylor Edmiston 讨论他的解决方案后,我们意识到我需要添加 catchup=False
的原因是因为我使用 Pip 安装了 Airflow,它使用了过时版本的 Airflow。显然,如果您使用 master branch of it's repository 中的 Airflow,则无需像我尝试的那样包含 catchup=False
以使其每分钟 运行。因此,尽管接受的答案解决了我的问题,但它并没有解决@Taylor Edmiston 发现的潜在问题。
尝试在 DAG()
中添加 catchup=False
。可能是因为您声明的 start_date
,您的 DAG 正在尝试回填。
您在 DAG 上的 schedule_interval
是正确的:*/1 * * * *
是 every minute。
您还可以从 default_args
中删除 start_date
和 schedule_interval
,因为它们与提供给 DAG 的 kwargs 是多余的。
如果您更改了第一次创建此 DAG 时的计划,Airflow 可能会感到困惑。在数据库中尝试 ,然后重新启动调度程序和网络服务器。如果你在 Airflow 的 master 分支上,它就像 $ airflow delete_dag my_dag
一样简单;否则,链接的答案解释了如何在其他版本上执行此操作。
我将你的代码归结为这个来检查,当 运行 在 Airflow 的主分支中时,它肯定是每分钟 运行ning 一个 DAG 运行。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
schedule_interval='*/1 * * * *',
default_args=default_args,
)
test = BashOperator(
task_id='test',
bash_command='echo "one minute test"',
dag=dag,
)
DAG 运行s:
我正在尝试将我的 DAG 安排到每分钟 运行,但它似乎是每秒 运行ning。根据我读过的所有内容,我只需要在我的 DAG 中包含 schedule_interval='*/1 * * * *', #..every 1 minute
就可以了,但它不起作用。这里我设置了一个简单的例子来测试它:
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 6, 4),
'schedule_interval': '*/1 * * * *', #..every 1 minute
'email': ['airflow@airflow.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
max_active_runs=3,
schedule_interval='*/1 * * * *', #..every 1 minute
default_args=default_args,
)
test= BashOperator(
task_id='test',
bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
retries=1,
dag=dag)
更新:
在与@Taylor Edmiston 讨论他的解决方案后,我们意识到我需要添加 catchup=False
的原因是因为我使用 Pip 安装了 Airflow,它使用了过时版本的 Airflow。显然,如果您使用 master branch of it's repository 中的 Airflow,则无需像我尝试的那样包含 catchup=False
以使其每分钟 运行。因此,尽管接受的答案解决了我的问题,但它并没有解决@Taylor Edmiston 发现的潜在问题。
尝试在 DAG()
中添加 catchup=False
。可能是因为您声明的 start_date
,您的 DAG 正在尝试回填。
您在 DAG 上的 schedule_interval
是正确的:*/1 * * * *
是 every minute。
您还可以从 default_args
中删除 start_date
和 schedule_interval
,因为它们与提供给 DAG 的 kwargs 是多余的。
如果您更改了第一次创建此 DAG 时的计划,Airflow 可能会感到困惑。在数据库中尝试 $ airflow delete_dag my_dag
一样简单;否则,链接的答案解释了如何在其他版本上执行此操作。
我将你的代码归结为这个来检查,当 运行 在 Airflow 的主分支中时,它肯定是每分钟 运行ning 一个 DAG 运行。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
schedule_interval='*/1 * * * *',
default_args=default_args,
)
test = BashOperator(
task_id='test',
bash_command='echo "one minute test"',
dag=dag,
)
DAG 运行s: