Apache Airflow DAG 无法导入本地模块

Apache Airflow DAG cannot import local module

我似乎不明白如何将模块导入 apache airflow DAG 定义文件。例如,我想这样做是为了能够创建一个库,该库使声明具有类似设置的任务不那么冗长。

这是我能想到的最简单的例子,它重现了这个问题:我修改了气流教程 (https://airflow.apache.org/tutorial.html#recap) 以简单地导入一个模块和 运行 来自该模块的定义。像这样:

目录结构:

- dags/
-- __init__.py
-- lib.py
-- tutorial.py

tutorial.py:

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# Here is my added import
from lib import print_double

# And my usage of the imported def
print_double(2)

## -- snip, because this is just the tutorial code, 
## i.e., some standard DAG defintion stuff --

print_double 只是一个简单的 def,它将你给它的任何输入乘以 2,然后打印结果,但显然这并不重要,因为这是一个导入问题。

我能够 运行 airflow test tutorial print_date 2015-06-01 按照教程文档成功 - dag 运行s,而且 print_double 成功。 4 按预期打印到控制台。一切都很好。

然后我上网 UI,受到 Broken DAG: [/home/airflow/airflow/dags/tutorial.py] No module named 'lib' 的欢迎。取消暂停 dag 并尝试使用 UI 手动 运行 会导致 "running" 状态,但它永远不会成功或失败。它永远坐在 "running" 上。我想排多少排就排多少,但他们都只会坐在 "running" 状态。

我检查了气流日志,没有看到任何有用的调试信息。

那我错过了什么?

您使用的是 Airflow 1.9.0 吗?这可能会在那里修复。

问题是由 Airflow 加载 DAG 的方式引起的:它不只是将它们作为普通 python 模块导入,因为它希望能够在不重新启动进程的情况下重新加载它。因此 . 不在 python 搜索路径中。

如果 1.9.0 没有解决这个问题,最简单的更改是在启动脚本中添加 export PYTHONPATH=/home/airflow/airflow/:$PYTHONPATH。其确切格式将取决于您使用的是什么(systemd 与 init 脚本等)

再次添加 sys 路径对我有用,

import sys
sys.path.insert(0,os.path.abspath(os.path.dirname(__file__)))

如果您正在使用 git-sync 并且没有在 kubernetes 中将 at 用作 initContainer(仅作为容器或根本不使用),那么模块可能没有加载到网络服务器中或调度程序。

只需将您的本地模块放在 airflow 插件文件夹中,它就会开始工作。 要知道气流插件的位置,请使用命令: 气流信息