为什么 Airflow PythonOperator 任务失败但 return 代码为 0?
why Airflow PythonOperator task failed but return code is 0?
我有一个与 PythonOperator 一起运行的 Airflow DAG,我想知道为什么我的任务执行失败但以 return 代码 0 退出?
执行失败,return 代码零误导我认为任务执行成功。
您可以看到下面的工作日志或附件图片,谁能解释为什么会发生这种情况并建议如何避免这种情况?
任务实例日志:
[2019-11-15 22:45:23,633] {base_task_runner.py:115} INFO - Job 736: Subtask http_request_send_push 2019-11-15 22:45:23,632 - 10688 - ERROR - 74 - http_request_send_push:http_request_send_push service trigger-resend-push error::
[2019-11-15 22:45:23,633] {logging_mixin.py:112} INFO -
[2019-11-15 22:45:23,632] {notification.py:74} ERROR - http_request_send_push:http_request_send_push service trigger-resend-push error::
[2019-11-15 22:45:23,633] {python_operator.py:114} INFO - Done. Returned value was: None
[2019-11-15 22:45:25,251] {logging_mixin.py:112} INFO -
[2019-11-15 22:45:25,250] {local_task_job.py:103} INFO - Task exited with return code 0
任务实例日志截图:
DAG 树视图屏幕截图:
简单来说,PythonOperator
只是一个运算符,它会执行一个python函数。如果有任何错误并且您希望任务进入 failed
状态,那么您需要在 python 可调用函数中引发异常。在下面的示例代码中,请参见 fourth_task
.
另一种方法是使用 ShortCircuitOperator
。
以下是来自 Apache Airflow API reference guide:
的描述
It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of “skipped”. If the condition is True, downstream tasks proceed as normal.
请参阅下面的示例代码,其中解释了 PythonOperator
和 ShortCircuitOperator
之间的区别。还展示了如何引发异常并将任务更改为 failed
状态。
def first_task(**kwargs):
logging.info("first_task")
def second_task(**kwargs):
logging.info("second_task")
return True
def third_task(**kwargs):
logging.info("third_task")
return False
def fourth_task(**kwargs):
logging.info("fourth_task")
raise Exception()
def fifth_task(**kwargs):
logging.info("fifth_task")
return True
def sixth_task(**kwargs):
logging.info("sixth_task")
return False
first_task = PythonOperator(
task_id='first_task',
provide_context=True,
python_callable=first_task,
dag=dag)
first_task_successor = DummyOperator(task_id='first_task_successor', dag=dag)
first_task_successor.set_upstream(first_task)
second_task = PythonOperator(
task_id='second_task',
provide_context=True,
python_callable=second_task,
dag=dag)
second_task_successor = DummyOperator(task_id='second_task_successor', dag=dag)
second_task_successor.set_upstream(second_task)
third_task = PythonOperator(
task_id='third_task',
provide_context=True,
python_callable=third_task,
dag=dag)
third_task_successor = DummyOperator(task_id='third_task_successor', dag=dag)
third_task_successor.set_upstream(third_task)
fourth_task = PythonOperator(
task_id='fourth_task',
provide_context=True,
python_callable=fourth_task,
dag=dag)
fourth_task_successor = DummyOperator(task_id='fourth_task_successor', dag=dag)
fourth_task_successor.set_upstream(fourth_task)
fifth_task = ShortCircuitOperator(
task_id='fifth_task',
provide_context=True,
python_callable=fifth_task,
dag=dag)
fifth_task_successor = DummyOperator(task_id='fifth_task_successor', dag=dag)
fifth_task_successor.set_upstream(fifth_task)
sixth_task = ShortCircuitOperator(
task_id='sixth_task',
provide_context=True,
python_callable=sixth_task,
dag=dag)
sixth_task_successor = DummyOperator(task_id='sixth_task_successor', dag=dag)
sixth_task_successor.set_upstream(sixth_task)
截图:
@kaxil 代码如下
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
import urllib.request
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['test@example.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
dag_id='airflow_so',
catchup=False,
default_args=args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval=timedelta(seconds=10)
)
def http_request_send_push(ds, **kwargs):
endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-push'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-repush'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_push = PythonOperator(
task_id='http_request_send_push',
provide_context=True,
python_callable=http_request_send_push,
dag=dag
)
def http_request_send_sms(ds, **kwargs):
endpoint='http://10.19.54.134:8080/v1/scheduleSendSms'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.134:8080/v1/scheduleReSendSms'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_sms = PythonOperator(
task_id='http_request_send_sms',
provide_context=True,
python_callable=http_request_send_sms,
dag=dag
)
def http_request_send_email(ds, **kwargs):
endpoint='http://10.19.54.138:8080/v1/scheduleSendEmail'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.138:8080/v1/scheduleReSendEmail'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_email = PythonOperator(
task_id='http_request_send_email',
provide_context=True,
python_callable=http_request_send_email,
dag=dag
)
http_request_send_push >> http_request_send_sms >> http_request_send_email
if __name__ == "__main__":
dag.cli()
我有一个与 PythonOperator 一起运行的 Airflow DAG,我想知道为什么我的任务执行失败但以 return 代码 0 退出?
执行失败,return 代码零误导我认为任务执行成功。
您可以看到下面的工作日志或附件图片,谁能解释为什么会发生这种情况并建议如何避免这种情况?
任务实例日志:
[2019-11-15 22:45:23,633] {base_task_runner.py:115} INFO - Job 736: Subtask http_request_send_push 2019-11-15 22:45:23,632 - 10688 - ERROR - 74 - http_request_send_push:http_request_send_push service trigger-resend-push error::
[2019-11-15 22:45:23,633] {logging_mixin.py:112} INFO -
[2019-11-15 22:45:23,632] {notification.py:74} ERROR - http_request_send_push:http_request_send_push service trigger-resend-push error::
[2019-11-15 22:45:23,633] {python_operator.py:114} INFO - Done. Returned value was: None
[2019-11-15 22:45:25,251] {logging_mixin.py:112} INFO -
[2019-11-15 22:45:25,250] {local_task_job.py:103} INFO - Task exited with return code 0
任务实例日志截图:
DAG 树视图屏幕截图:
简单来说,PythonOperator
只是一个运算符,它会执行一个python函数。如果有任何错误并且您希望任务进入 failed
状态,那么您需要在 python 可调用函数中引发异常。在下面的示例代码中,请参见 fourth_task
.
另一种方法是使用 ShortCircuitOperator
。
以下是来自 Apache Airflow API reference guide:
It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of “skipped”. If the condition is True, downstream tasks proceed as normal.
请参阅下面的示例代码,其中解释了 PythonOperator
和 ShortCircuitOperator
之间的区别。还展示了如何引发异常并将任务更改为 failed
状态。
def first_task(**kwargs):
logging.info("first_task")
def second_task(**kwargs):
logging.info("second_task")
return True
def third_task(**kwargs):
logging.info("third_task")
return False
def fourth_task(**kwargs):
logging.info("fourth_task")
raise Exception()
def fifth_task(**kwargs):
logging.info("fifth_task")
return True
def sixth_task(**kwargs):
logging.info("sixth_task")
return False
first_task = PythonOperator(
task_id='first_task',
provide_context=True,
python_callable=first_task,
dag=dag)
first_task_successor = DummyOperator(task_id='first_task_successor', dag=dag)
first_task_successor.set_upstream(first_task)
second_task = PythonOperator(
task_id='second_task',
provide_context=True,
python_callable=second_task,
dag=dag)
second_task_successor = DummyOperator(task_id='second_task_successor', dag=dag)
second_task_successor.set_upstream(second_task)
third_task = PythonOperator(
task_id='third_task',
provide_context=True,
python_callable=third_task,
dag=dag)
third_task_successor = DummyOperator(task_id='third_task_successor', dag=dag)
third_task_successor.set_upstream(third_task)
fourth_task = PythonOperator(
task_id='fourth_task',
provide_context=True,
python_callable=fourth_task,
dag=dag)
fourth_task_successor = DummyOperator(task_id='fourth_task_successor', dag=dag)
fourth_task_successor.set_upstream(fourth_task)
fifth_task = ShortCircuitOperator(
task_id='fifth_task',
provide_context=True,
python_callable=fifth_task,
dag=dag)
fifth_task_successor = DummyOperator(task_id='fifth_task_successor', dag=dag)
fifth_task_successor.set_upstream(fifth_task)
sixth_task = ShortCircuitOperator(
task_id='sixth_task',
provide_context=True,
python_callable=sixth_task,
dag=dag)
sixth_task_successor = DummyOperator(task_id='sixth_task_successor', dag=dag)
sixth_task_successor.set_upstream(sixth_task)
截图:
@kaxil 代码如下
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
import urllib.request
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['test@example.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
dag_id='airflow_so',
catchup=False,
default_args=args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval=timedelta(seconds=10)
)
def http_request_send_push(ds, **kwargs):
endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-push'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-repush'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_push = PythonOperator(
task_id='http_request_send_push',
provide_context=True,
python_callable=http_request_send_push,
dag=dag
)
def http_request_send_sms(ds, **kwargs):
endpoint='http://10.19.54.134:8080/v1/scheduleSendSms'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.134:8080/v1/scheduleReSendSms'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_sms = PythonOperator(
task_id='http_request_send_sms',
provide_context=True,
python_callable=http_request_send_sms,
dag=dag
)
def http_request_send_email(ds, **kwargs):
endpoint='http://10.19.54.138:8080/v1/scheduleSendEmail'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
endpoint='http://10.19.54.138:8080/v1/scheduleReSendEmail'
try:
response = urllib.request.urlopen(endpoint, timeout=10)
except Exception as e:
print('%s:%s:%s',
inspect.stack()[0][3],
type(e),
e)
else:
req = response.read()
print('%s:%s:%s',
inspect.stack()[0][3],
type(req),
req)
http_request_send_email = PythonOperator(
task_id='http_request_send_email',
provide_context=True,
python_callable=http_request_send_email,
dag=dag
)
http_request_send_push >> http_request_send_sms >> http_request_send_email
if __name__ == "__main__":
dag.cli()