Apache Airflow 给出损坏的 DAG 错误无法为 speedtest.py 导入 __builtin__
Apache Airflow giving broken DAG error cannot import __builtin__ for speedtest.py
这是我遇到的一个奇怪的错误。在我的 Python 3.7 环境中,我安装了 Airflow 2、speedtest-cli
和其他一些使用 pip 的东西,我一直在 Airflow UI:
中看到这个错误弹出窗口
Broken DAG: [/env/app/airflow/dags/my_dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 156, in <module>
import __builtin__
ModuleNotFoundError: No module named '__builtin__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 179, in <module>
_py3_utf8_stdout = _Py3Utf8Output(sys.stdout)
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 166, in __init__
buf = FileIO(f.fileno(), 'w')
AttributeError: 'StreamLogWriter' object has no attribute 'fileno'
为了完整性检查,我做了 运行 以下操作,没有发现任何问题:
~# python airflow/dags/my_dag.py
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
~# airflow dags list
dag_id | filepath | owner | paused
===========+===============+=========+=======
my_dag | my_dag.py | rafay | False
~# airflow tasks list my_dag
[2021-03-08 16:46:26,950] {dagbag.py:448} INFO - Filling up the DagBag from /env/app/airflow/dags
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
Start_backup
get_configs
get_targets
push_targets
所以没有什么不寻常的,测试每个任务也不会引起问题。此外,运行独立于 Airflow 之外 speedtest-cli
脚本也不会引发任何错误。脚本是这样的:
import speedtest
def get_upload_speed():
"""
Calculates the upload speed of the internet in using speedtest api
Returns:
Returns upload speed in Mbps
"""
try:
s = speedtest.Speedtest()
upload = s.upload()
except speedtest.SpeedtestException as e:
raise AirflowException("Failed to check network bandwidth make sure internet is available.\nException: {}".format(e))
return round(upload / (1024**2), 2)
我什至去了 speedtest.py
的确切行,如提到的 Broken DAG 错误,第 156 行,它看起来很好,当我输入 python 解释器时 运行 很好.
try:
import __builtin__
except ImportError:
import builtins
from io import TextIOWrapper, FileIO
那么,我该如何诊断呢?似乎是某种包导入问题
编辑:如果有帮助,这里是我的目录和 my_dag.py
的导入结构
- airflow
- dags
- tasks
- get_configs.py
- get_taargets.py
- push_targets.py (speedtest is imported here)
- my_dag.py
dag文件中的任务导入顺序如下:
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from tasks.get_configs import get_configs
from tasks.get_targets import get_targets
from tasks.push_targets import push_targets
...
Airflow 任务中的 Airflow StreamLogWriter
(and other log-related facilities) do not implement the fileno
method expected by "standard" Python (I/O) log facility clients (confirmed by a todo
comment). The problem here happens also when enabling the faulthandler
standard library。
那么此时该怎么办呢?除了打开问题或向 Airflow 发送 PR 之外,这确实是个案。在 speedtest-cli
情况下,可能需要隔离调用 fileno
的函数,并尝试“替换”它(例如,分叉库,如果可以隔离和注入则更改函数,也许选择不使用那部分代码的配置)。
在我的特殊情况下,没有办法绕过代码,fork 是最直接的方法。
这是我遇到的一个奇怪的错误。在我的 Python 3.7 环境中,我安装了 Airflow 2、speedtest-cli
和其他一些使用 pip 的东西,我一直在 Airflow UI:
Broken DAG: [/env/app/airflow/dags/my_dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 156, in <module>
import __builtin__
ModuleNotFoundError: No module named '__builtin__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 179, in <module>
_py3_utf8_stdout = _Py3Utf8Output(sys.stdout)
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 166, in __init__
buf = FileIO(f.fileno(), 'w')
AttributeError: 'StreamLogWriter' object has no attribute 'fileno'
为了完整性检查,我做了 运行 以下操作,没有发现任何问题:
~# python airflow/dags/my_dag.py
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
~# airflow dags list
dag_id | filepath | owner | paused
===========+===============+=========+=======
my_dag | my_dag.py | rafay | False
~# airflow tasks list my_dag
[2021-03-08 16:46:26,950] {dagbag.py:448} INFO - Filling up the DagBag from /env/app/airflow/dags
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
Start_backup
get_configs
get_targets
push_targets
所以没有什么不寻常的,测试每个任务也不会引起问题。此外,运行独立于 Airflow 之外 speedtest-cli
脚本也不会引发任何错误。脚本是这样的:
import speedtest
def get_upload_speed():
"""
Calculates the upload speed of the internet in using speedtest api
Returns:
Returns upload speed in Mbps
"""
try:
s = speedtest.Speedtest()
upload = s.upload()
except speedtest.SpeedtestException as e:
raise AirflowException("Failed to check network bandwidth make sure internet is available.\nException: {}".format(e))
return round(upload / (1024**2), 2)
我什至去了 speedtest.py
的确切行,如提到的 Broken DAG 错误,第 156 行,它看起来很好,当我输入 python 解释器时 运行 很好.
try:
import __builtin__
except ImportError:
import builtins
from io import TextIOWrapper, FileIO
那么,我该如何诊断呢?似乎是某种包导入问题
编辑:如果有帮助,这里是我的目录和 my_dag.py
- airflow
- dags
- tasks
- get_configs.py
- get_taargets.py
- push_targets.py (speedtest is imported here)
- my_dag.py
dag文件中的任务导入顺序如下:
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from tasks.get_configs import get_configs
from tasks.get_targets import get_targets
from tasks.push_targets import push_targets
...
Airflow 任务中的 Airflow StreamLogWriter
(and other log-related facilities) do not implement the fileno
method expected by "standard" Python (I/O) log facility clients (confirmed by a todo
comment). The problem here happens also when enabling the faulthandler
standard library。
那么此时该怎么办呢?除了打开问题或向 Airflow 发送 PR 之外,这确实是个案。在 speedtest-cli
情况下,可能需要隔离调用 fileno
的函数,并尝试“替换”它(例如,分叉库,如果可以隔离和注入则更改函数,也许选择不使用那部分代码的配置)。
在我的特殊情况下,没有办法绕过代码,fork 是最直接的方法。