Dummy Operator 失败,状态为:upstream_failed
Dummy Operator fails with state: upstream_failed
我有一个创建 EMR 集群的 dag,如下所示:
STEP_CONFIG = dict(
CLASS="fooA",
JAR=DEFAULT_JAR,
PROPERTIES="fooA.properties",
ACTION_ON_FAILURE="CONTINUE"
)
DEFAULT_ARGS['start_date'] = dt.datetime(2019, 12, 18)
with DAG(
DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=dt.timedelta(hours=2),
schedule_interval='@daily',
catchup=False
) as dag:
compose_dag_for_ltc(LONG_TERM_CLUSTER_NAME, JOB_NAME, STEP_CONFIG)
add_step >> step_added >> watch_step >> all_steps_finished
dag 成功创建了 EMR 集群并添加了所有步骤,但有时 all_steps_finished 任务会失败,状态为:upstream_failed。故障不在 EMR 侧,而是在气流侧。它还不会生成任何日志,这使得诊断变得更加困难。知道这里会发生什么吗?
由于您 运行 使用的是旧 Airflow 版本,这可能是一个错误,稍后已修复。没有充分的理由来解释您的经历。
在 Airflow 2 DummyOperator
中,任何继承自它的运算符都不会被调度程序考虑执行(如果它没有实际工作要做)请参阅 source code 在这种情况下该任务将简单地标记为成功。所以我可以说的是,当您升级到 Airflow 2 时,问题很可能会得到解决。
我有一个创建 EMR 集群的 dag,如下所示:
STEP_CONFIG = dict(
CLASS="fooA",
JAR=DEFAULT_JAR,
PROPERTIES="fooA.properties",
ACTION_ON_FAILURE="CONTINUE"
)
DEFAULT_ARGS['start_date'] = dt.datetime(2019, 12, 18)
with DAG(
DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=dt.timedelta(hours=2),
schedule_interval='@daily',
catchup=False
) as dag:
compose_dag_for_ltc(LONG_TERM_CLUSTER_NAME, JOB_NAME, STEP_CONFIG)
add_step >> step_added >> watch_step >> all_steps_finished
dag 成功创建了 EMR 集群并添加了所有步骤,但有时 all_steps_finished 任务会失败,状态为:upstream_failed。故障不在 EMR 侧,而是在气流侧。它还不会生成任何日志,这使得诊断变得更加困难。知道这里会发生什么吗?
由于您 运行 使用的是旧 Airflow 版本,这可能是一个错误,稍后已修复。没有充分的理由来解释您的经历。
在 Airflow 2 DummyOperator
中,任何继承自它的运算符都不会被调度程序考虑执行(如果它没有实际工作要做)请参阅 source code 在这种情况下该任务将简单地标记为成功。所以我可以说的是,当您升级到 Airflow 2 时,问题很可能会得到解决。