为什么 Airflow PythonOperator 任务失败但 return 代码为 0?

why Airflow PythonOperator task failed but return code is 0?

我有一个与 PythonOperator 一起运行的 Airflow DAG,我想知道为什么我的任务执行失败但以 return 代码 0 退出?

执行失败,return 代码零误导我认为任务执行成功。

您可以看到下面的工作日志或附件图片,谁能解释为什么会发生这种情况并建议如何避免这种情况?

任务实例日志:

[2019-11-15 22:45:23,633] {base_task_runner.py:115} INFO - Job 736: Subtask http_request_send_push 2019-11-15 22:45:23,632 - 10688 - ERROR - 74 - http_request_send_push:http_request_send_push service trigger-resend-push error::

[2019-11-15 22:45:23,633] {logging_mixin.py:112} INFO -

[2019-11-15 22:45:23,632] {notification.py:74} ERROR - http_request_send_push:http_request_send_push service trigger-resend-push error::

[2019-11-15 22:45:23,633] {python_operator.py:114} INFO - Done. Returned value was: None

[2019-11-15 22:45:25,251] {logging_mixin.py:112} INFO -

[2019-11-15 22:45:25,250] {local_task_job.py:103} INFO - Task exited with return code 0

任务实例日志截图:

DAG 树视图屏幕截图:

简单来说,PythonOperator只是一个运算符,它会执行一个python函数。如果有任何错误并且您希望任务进入 failed 状态,那么您需要在 python 可调用函数中引发异常。在下面的示例代码中,请参见 fourth_task.

另一种方法是使用 ShortCircuitOperator。 以下是来自 Apache Airflow API reference guide:

的描述

It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of “skipped”. If the condition is True, downstream tasks proceed as normal.

请参阅下面的示例代码,其中解释了 PythonOperatorShortCircuitOperator 之间的区别。还展示了如何引发异常并将任务更改为 failed 状态。

def first_task(**kwargs):
    logging.info("first_task")


def second_task(**kwargs):
    logging.info("second_task")
    return True


def third_task(**kwargs):
    logging.info("third_task")
    return False


def fourth_task(**kwargs):
    logging.info("fourth_task")
    raise Exception()


def fifth_task(**kwargs):
    logging.info("fifth_task")
    return True


def sixth_task(**kwargs):
    logging.info("sixth_task")
    return False

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=first_task,
    dag=dag)
first_task_successor = DummyOperator(task_id='first_task_successor', dag=dag)
first_task_successor.set_upstream(first_task)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=second_task,
    dag=dag)
second_task_successor = DummyOperator(task_id='second_task_successor', dag=dag)
second_task_successor.set_upstream(second_task)


third_task = PythonOperator(
    task_id='third_task',
    provide_context=True,
    python_callable=third_task,
    dag=dag)
third_task_successor = DummyOperator(task_id='third_task_successor', dag=dag)
third_task_successor.set_upstream(third_task)


fourth_task = PythonOperator(
    task_id='fourth_task',
    provide_context=True,
    python_callable=fourth_task,
    dag=dag)
fourth_task_successor = DummyOperator(task_id='fourth_task_successor', dag=dag)
fourth_task_successor.set_upstream(fourth_task)


fifth_task = ShortCircuitOperator(
    task_id='fifth_task',
    provide_context=True,
    python_callable=fifth_task,
    dag=dag)
fifth_task_successor = DummyOperator(task_id='fifth_task_successor', dag=dag)
fifth_task_successor.set_upstream(fifth_task)

sixth_task = ShortCircuitOperator(
    task_id='sixth_task',
    provide_context=True,
    python_callable=sixth_task,
    dag=dag)
sixth_task_successor = DummyOperator(task_id='sixth_task_successor', dag=dag)
sixth_task_successor.set_upstream(sixth_task)

截图:

@kaxil 代码如下

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import inspect
import urllib.request
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'email': ['test@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    dag_id='airflow_so',
    catchup=False,
    default_args=args,
    dagrun_timeout=timedelta(minutes=5),
    schedule_interval=timedelta(seconds=10)
)

def http_request_send_push(ds, **kwargs):
    endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-push'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

    endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-repush'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

http_request_send_push = PythonOperator(
    task_id='http_request_send_push',
    provide_context=True,
    python_callable=http_request_send_push,
    dag=dag
)


def http_request_send_sms(ds, **kwargs):
    endpoint='http://10.19.54.134:8080/v1/scheduleSendSms'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

    endpoint='http://10.19.54.134:8080/v1/scheduleReSendSms'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

http_request_send_sms = PythonOperator(
    task_id='http_request_send_sms',
    provide_context=True,
    python_callable=http_request_send_sms,
    dag=dag
)


def http_request_send_email(ds, **kwargs):
    endpoint='http://10.19.54.138:8080/v1/scheduleSendEmail'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

    endpoint='http://10.19.54.138:8080/v1/scheduleReSendEmail'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

http_request_send_email = PythonOperator(
    task_id='http_request_send_email',
    provide_context=True,
    python_callable=http_request_send_email,
    dag=dag
)

http_request_send_push >> http_request_send_sms >> http_request_send_email

if __name__ == "__main__":
    dag.cli()