遇到 None/Falsy 值时不会抛出气流异常
Airflow Exception not being thrown when encountering None/Falsy values
我正在尝试在一个 PythonOperator _etl_lasic
和另一个 PythonOperator _download_s3_data
之间传递数据,这工作正常但我想在传递的值为 None
时抛出异常这应该将任务标记为失败。
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException
def _etl_lasic(**context):
path_s3 = None
context["task_instance"].xcom_push(
key="path_s3",
value=path_s3,
)
def _download_s3_data(templates_dict, **context):
path_s3 = templates_dict["path_s3"]
if not path_s3:
raise AirflowFailException("Path to S3 was not passed!")
else:
print(f"Path to S3: {path_s3}")
with DAG(
dag_id="02_lasic_retraining_without_etl",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@once",
) as dag:
etl_lasic = PythonOperator(
task_id="etl_lasic",
python_callable=_etl_lasic,
)
download_s3_data = PythonOperator(
task_id="download_s3_data",
python_callable=_download_s3_data,
templates_dict={
"path_s3": "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
},
)
etl_lasic >> download_s3_data
日志:
[2021-08-17 04:04:41,128] {logging_mixin.py:103} INFO - Path to S3: None
[2021-08-17 04:04:41,128] {python.py:118} INFO - Done. Returned value was: None
[2021-08-17 04:04:41,143] {taskinstance.py:1135} INFO - Marking task as SUCCESS. dag_id=02_lasic_retraining_without_etl, task_id=download_s3_data, execution_date=20210817T040439, start_date=20210817T040440, end_date=20210817T040441
[2021-08-17 04:04:41,189] {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-17 04:04:41,212] {local_task_job.py:118} INFO - Task exited with return code 0
Jinja 模板值默认呈现为字符串。在您的情况下,即使您推送 None
的 XCom 值,当通过 "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
拉取该值时,该值实际上 呈现为“None”,根据当前逻辑不会抛出异常。
有两个选项可以解决这个问题:
- 不要在“_etl_lasic”函数中将
path_s3
设置为 None
,而是将其设置为空字符串。
- 如果您使用的是 Airflow 2.1+,则有一个参数
render_template_as_native_obj
,可以在 DAG 级别设置,它将 Jinja 模板化的值呈现为本机 Python 类型(列表,字典等)。将该参数设置为 True
将在不更改函数中设置 path_s3
的方式的情况下实现这一目的。记录了一个概念性示例 here。
我正在尝试在一个 PythonOperator _etl_lasic
和另一个 PythonOperator _download_s3_data
之间传递数据,这工作正常但我想在传递的值为 None
时抛出异常这应该将任务标记为失败。
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException
def _etl_lasic(**context):
path_s3 = None
context["task_instance"].xcom_push(
key="path_s3",
value=path_s3,
)
def _download_s3_data(templates_dict, **context):
path_s3 = templates_dict["path_s3"]
if not path_s3:
raise AirflowFailException("Path to S3 was not passed!")
else:
print(f"Path to S3: {path_s3}")
with DAG(
dag_id="02_lasic_retraining_without_etl",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@once",
) as dag:
etl_lasic = PythonOperator(
task_id="etl_lasic",
python_callable=_etl_lasic,
)
download_s3_data = PythonOperator(
task_id="download_s3_data",
python_callable=_download_s3_data,
templates_dict={
"path_s3": "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
},
)
etl_lasic >> download_s3_data
日志:
[2021-08-17 04:04:41,128] {logging_mixin.py:103} INFO - Path to S3: None
[2021-08-17 04:04:41,128] {python.py:118} INFO - Done. Returned value was: None
[2021-08-17 04:04:41,143] {taskinstance.py:1135} INFO - Marking task as SUCCESS. dag_id=02_lasic_retraining_without_etl, task_id=download_s3_data, execution_date=20210817T040439, start_date=20210817T040440, end_date=20210817T040441
[2021-08-17 04:04:41,189] {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-08-17 04:04:41,212] {local_task_job.py:118} INFO - Task exited with return code 0
Jinja 模板值默认呈现为字符串。在您的情况下,即使您推送 None
的 XCom 值,当通过 "{{task_instance.xcom_pull(task_ids='etl_lasic',key='path_s3')}}"
拉取该值时,该值实际上 呈现为“None”,根据当前逻辑不会抛出异常。
有两个选项可以解决这个问题:
- 不要在“_etl_lasic”函数中将
path_s3
设置为None
,而是将其设置为空字符串。 - 如果您使用的是 Airflow 2.1+,则有一个参数
render_template_as_native_obj
,可以在 DAG 级别设置,它将 Jinja 模板化的值呈现为本机 Python 类型(列表,字典等)。将该参数设置为True
将在不更改函数中设置path_s3
的方式的情况下实现这一目的。记录了一个概念性示例 here。