DagBag 没有按预期填充 dag
DagBag does not populate dags as expected
我想测试我的 dag 以确保它们具有某些默认参数,并确保所有 dag 都没有输入错误。
我正在使用 DagBag 来填充 dag,然后遍历每个 dag 并检查每个 dag 的值以确保它们是我想要的。
因为 DagBag 也可以获取气流附带的示例 dag,我正在传递参数 include_example = False
但是当我这样做时我意识到 none 我的 dags 被拉进了 dagbags。
我是不是用错了DagBag?或者在测试时是否有其他更好的方法来拉取和检查 dags?
我的代码
def test_no_import_errors():
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0, "No Import Failures"
构建 DagBag 对象时,您可以传递 DagBag 应在其中查找 dag 文件的文件夹列表。我想这就是问题所在
我能够重现问题,在创建 DagBag
对象时,如果您不为 dag_folder
参数提供值,则不会将 DAG 添加到集合中。
正如 Jarek 所说,这有效:
def test_no_import_errors():
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dag_bag.import_errors) == 0, "No Import Failures"
这是我用来测试的例子:
# python -m unittest test_dag_validation.py
import unittest
import logging
from airflow.models import DagBag
class TestDAGValidation(unittest.TestCase):
@classmethod
def setUpClass(cls):
log = logging.getLogger()
handler = logging.FileHandler("dag_validation.log", mode="w")
handler.setLevel(logging.INFO)
log.addHandler(handler)
cls.log = log
def test_no_import_errors(self):
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
self.log.info(f"How Many DAGs?: {dag_bag.size()}")
self.log.info(f"Import errors: {len(dag_bag.import_errors)}")
assert len(dag_bag.import_errors) == 0, "No Import Failures"
默认情况下,airflow DagBag 在 AIRFLOW_HOME/dags 文件夹中查找 dag。
这通常存储在 airflow.cfg 文件中。
默认情况下它指向 ~/airflow 文件夹,但你可以通过 运行 -
指向当前工作目录
export $AIRFLOW_HOME=abs_path_of_your_folder
如果您使用 python 安装 Airflow,确保先导出 $AIRFLOW_HOME 变量,然后激活虚拟环境,最后安装 Airflow。这将确保您的路径正确附加到 airflow.cfg 文件。
您还可以检查您的文件夹是否正确加载,同时 运行 单元测试。在终端中,文件路径打印为
[2022-02-03 20:45:57,657] {dagbag.py:500} INFO - Filling up the DagBag from /Users/kehsihba19/Desktop/airflow-test/dags
用于检查 DAG 中导入错误的示例文件,其中包括检查拼写错误和循环任务检查 -
from airflow.models import DagBag
import unittest
class TestDags(unittest.TestCase):
def test_DagBag(self):
self.dag_bag = DagBag(include_examples=False)
self.assertFalse(bool(self.dag_bag.import_errors))
if __name__ == "__main__":
unittest.main()
我想测试我的 dag 以确保它们具有某些默认参数,并确保所有 dag 都没有输入错误。
我正在使用 DagBag 来填充 dag,然后遍历每个 dag 并检查每个 dag 的值以确保它们是我想要的。
因为 DagBag 也可以获取气流附带的示例 dag,我正在传递参数 include_example = False
但是当我这样做时我意识到 none 我的 dags 被拉进了 dagbags。
我是不是用错了DagBag?或者在测试时是否有其他更好的方法来拉取和检查 dags?
我的代码
def test_no_import_errors():
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0, "No Import Failures"
构建 DagBag 对象时,您可以传递 DagBag 应在其中查找 dag 文件的文件夹列表。我想这就是问题所在
我能够重现问题,在创建 DagBag
对象时,如果您不为 dag_folder
参数提供值,则不会将 DAG 添加到集合中。
正如 Jarek 所说,这有效:
def test_no_import_errors():
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dag_bag.import_errors) == 0, "No Import Failures"
这是我用来测试的例子:
# python -m unittest test_dag_validation.py
import unittest
import logging
from airflow.models import DagBag
class TestDAGValidation(unittest.TestCase):
@classmethod
def setUpClass(cls):
log = logging.getLogger()
handler = logging.FileHandler("dag_validation.log", mode="w")
handler.setLevel(logging.INFO)
log.addHandler(handler)
cls.log = log
def test_no_import_errors(self):
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
self.log.info(f"How Many DAGs?: {dag_bag.size()}")
self.log.info(f"Import errors: {len(dag_bag.import_errors)}")
assert len(dag_bag.import_errors) == 0, "No Import Failures"
默认情况下,airflow DagBag 在 AIRFLOW_HOME/dags 文件夹中查找 dag。
这通常存储在 airflow.cfg 文件中。 默认情况下它指向 ~/airflow 文件夹,但你可以通过 运行 -
指向当前工作目录export $AIRFLOW_HOME=abs_path_of_your_folder
如果您使用 python 安装 Airflow,确保先导出 $AIRFLOW_HOME 变量,然后激活虚拟环境,最后安装 Airflow。这将确保您的路径正确附加到 airflow.cfg 文件。
您还可以检查您的文件夹是否正确加载,同时 运行 单元测试。在终端中,文件路径打印为
[2022-02-03 20:45:57,657] {dagbag.py:500} INFO - Filling up the DagBag from /Users/kehsihba19/Desktop/airflow-test/dags
用于检查 DAG 中导入错误的示例文件,其中包括检查拼写错误和循环任务检查 -
from airflow.models import DagBag
import unittest
class TestDags(unittest.TestCase):
def test_DagBag(self):
self.dag_bag = DagBag(include_examples=False)
self.assertFalse(bool(self.dag_bag.import_errors))
if __name__ == "__main__":
unittest.main()