Airflow 在预定 运行 期间跳过子标签的任务
Airflow skipping subdags' tasks during scheduled run
我有一个 DAG 运行从 Web 手动触发时运行良好 UI,但在计划的每日 运行 中,所有 subdag 在 60 秒后都标记为成功这些子标签中的任务将被跳过。
为什么计划中的任务会被跳过 运行?
MainDag:
with models.DAG(
"MainDag",
schedule_interval='@daily',
start_date=dates.days_ago(0),
user_defined_macros=TEMPLATE_ENV,
) as dag:
prepare_factory = SubDagOperator(
task_id="prepare_factory ",
trigger_rule="one_success",
subdag=subdag_prepare_factory.sub_dag(
dag.dag_id, "prepare_factory", dag.start_date, dag.schedule_interval
),
)
子代码:
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
with models.DAG(
"{0}.{1}".format(parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
user_defined_macros=TEMPLATE_ENV,
) as dag:
# HOOKS
hook = _sshHook.getSshHook()
# Tasks
step_1 = ssh_operator.SSHOperator(
task_id="step_1",
ssh_hook=hook,
command="script.sh"),
)
问题出在 start_date。使用 date.days_ago
会在午夜在主 dag 和子 dag 之间产生错误。
我现在使用 datetime
的静态日期
静态日期示例
with models.DAG(
"MainDag",
schedule_interval='30 0 * * *',
start_date=datetime(2021, 4, 7),
user_defined_macros=TEMPLATE_ENV,
) as dag:
我有一个 DAG 运行从 Web 手动触发时运行良好 UI,但在计划的每日 运行 中,所有 subdag 在 60 秒后都标记为成功这些子标签中的任务将被跳过。
为什么计划中的任务会被跳过 运行?
MainDag:
with models.DAG(
"MainDag",
schedule_interval='@daily',
start_date=dates.days_ago(0),
user_defined_macros=TEMPLATE_ENV,
) as dag:
prepare_factory = SubDagOperator(
task_id="prepare_factory ",
trigger_rule="one_success",
subdag=subdag_prepare_factory.sub_dag(
dag.dag_id, "prepare_factory", dag.start_date, dag.schedule_interval
),
)
子代码:
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
with models.DAG(
"{0}.{1}".format(parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
user_defined_macros=TEMPLATE_ENV,
) as dag:
# HOOKS
hook = _sshHook.getSshHook()
# Tasks
step_1 = ssh_operator.SSHOperator(
task_id="step_1",
ssh_hook=hook,
command="script.sh"),
)
问题出在 start_date。使用 date.days_ago
会在午夜在主 dag 和子 dag 之间产生错误。
我现在使用 datetime
静态日期示例
with models.DAG(
"MainDag",
schedule_interval='30 0 * * *',
start_date=datetime(2021, 4, 7),
user_defined_macros=TEMPLATE_ENV,
) as dag: