Google Cloud Composer DAG 未被触发
Google Cloud Composer DAG is not getting triggered
我计划从今天 2020/08/11 开始,在东部标准时间(纽约州)星期二至星期六上午 04:00 运行 安排 DAG。编写代码并部署后,我预计 DAG 会被触发。我刷新了 Airflow UI 页面几次,但它仍然没有触发。我正在使用带有 python 3.
的 Airflow 版本 v1.10.9-composer
这是我的 DAG 代码:
"""
This DAG executes a retrieval job
"""
# Required packages to execute DAG
from __future__ import print_function
import pendulum
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
local_tz = pendulum.timezone("America/New_York")
# DAG parameters
default_args = {
'owner': 'Me',
'depends_on_past': False,
'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'provide_context': True,
'retries': None,
'retry_delay': timedelta(minutes=5)
}
# create DAG object with Name and default_args
with DAG(
'retrieve_files',
schedule_interval='0 4 * * 2-6',
description='Retrieves files from sftp',
max_active_runs=1,
catchup=True,
default_args=default_args
) as dag:
# Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class
start_dummy = DummyOperator(
task_id='start',
dag=dag
)
end_dummy = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
retrieve_file = SSHOperator(
ssh_conn_id="my_conn",
task_id='retrieve_file',
command='/usr/bin/python3 /path_to_file/getFile.py',
dag=dag)
dag.doc_md = __doc__
retrieve_file.doc_md = """\
#### Task Documentation
Connects to sftp and retrieves files.
"""
start_dummy >> retrieve_file >> end_dummy
The scheduler runs your job one schedule_interval AFTER the start date.
If your start_date is 2020-01-01 and schedule_interval is @daily, the
first run will be created on 2020-01-02 i.e., after your start date
has passed.
为了在每天的特定时间(包括今天)运行 DAG,需要将 start_date
设置为过去的时间并且 schedule_interval
需要cron
格式的所需时间。正确设置昨天的日期时间非常重要,否则触发器将不起作用。
在这种情况下,我们应该将 start_date
设置为上一周的星期二,即:(2020, 8, 4)
。自开始日期以来应该有 1 周的时间间隔,因为 运行每周一次。
让我们来看看下面的例子,它展示了如何 运行 一份工作 04:00 AM,星期二到星期六 EST:
from datetime import datetime, timedelta
from airflow import models
import pendulum
from airflow.operators import bash_operator
local_tz = pendulum.timezone("America/New_York")
default_dag_args = {
'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),
'retries': 0,
}
with models.DAG(
'Test',
default_args=default_dag_args,
schedule_interval='00 04 * * 2-6') as dag:
# DAG code
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
我建议您查看 what’s the deal with start_date 文档。
我计划从今天 2020/08/11 开始,在东部标准时间(纽约州)星期二至星期六上午 04:00 运行 安排 DAG。编写代码并部署后,我预计 DAG 会被触发。我刷新了 Airflow UI 页面几次,但它仍然没有触发。我正在使用带有 python 3.
的 Airflow 版本 v1.10.9-composer这是我的 DAG 代码:
"""
This DAG executes a retrieval job
"""
# Required packages to execute DAG
from __future__ import print_function
import pendulum
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
local_tz = pendulum.timezone("America/New_York")
# DAG parameters
default_args = {
'owner': 'Me',
'depends_on_past': False,
'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'provide_context': True,
'retries': None,
'retry_delay': timedelta(minutes=5)
}
# create DAG object with Name and default_args
with DAG(
'retrieve_files',
schedule_interval='0 4 * * 2-6',
description='Retrieves files from sftp',
max_active_runs=1,
catchup=True,
default_args=default_args
) as dag:
# Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class
start_dummy = DummyOperator(
task_id='start',
dag=dag
)
end_dummy = DummyOperator(
task_id='end',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
retrieve_file = SSHOperator(
ssh_conn_id="my_conn",
task_id='retrieve_file',
command='/usr/bin/python3 /path_to_file/getFile.py',
dag=dag)
dag.doc_md = __doc__
retrieve_file.doc_md = """\
#### Task Documentation
Connects to sftp and retrieves files.
"""
start_dummy >> retrieve_file >> end_dummy
The scheduler runs your job one schedule_interval AFTER the start date.
If your start_date is 2020-01-01 and schedule_interval is @daily, the first run will be created on 2020-01-02 i.e., after your start date has passed.
为了在每天的特定时间(包括今天)运行 DAG,需要将 start_date
设置为过去的时间并且 schedule_interval
需要cron
格式的所需时间。正确设置昨天的日期时间非常重要,否则触发器将不起作用。
在这种情况下,我们应该将 start_date
设置为上一周的星期二,即:(2020, 8, 4)
。自开始日期以来应该有 1 周的时间间隔,因为 运行每周一次。
让我们来看看下面的例子,它展示了如何 运行 一份工作 04:00 AM,星期二到星期六 EST:
from datetime import datetime, timedelta
from airflow import models
import pendulum
from airflow.operators import bash_operator
local_tz = pendulum.timezone("America/New_York")
default_dag_args = {
'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),
'retries': 0,
}
with models.DAG(
'Test',
default_args=default_dag_args,
schedule_interval='00 04 * * 2-6') as dag:
# DAG code
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
我建议您查看 what’s the deal with start_date 文档。