DagBag 没有按预期填充 dag

DagBag does not populate dags as expected

我想测试我的 dag 以确保它们具有某些默认参数,并确保所有 dag 都没有输入错误。

我正在使用 DagBag 来填充 dag,然后遍历每个 dag 并检查每个 dag 的值以确保它们是我想要的。

因为 DagBag 也可以获取气流附带的示例 dag,我正在传递参数 include_example = False 但是当我这样做时我意识到 none 我的 dags 被拉进了 dagbags。

我是不是用错了DagBag?或者在测试时是否有其他更好的方法来拉取和检查 dags?

我的代码

def test_no_import_errors():
    dag_bag = DagBag(include_examples=False)
    assert len(dag_bag.import_errors) == 0, "No Import Failures"

构建 DagBag 对象时,您可以传递 DagBag 应在其中查找 dag 文件的文件夹列表。我想这就是问题所在

我能够重现问题,在创建 DagBag 对象时,如果您不为 dag_folder 参数提供值,则不会将 DAG 添加到集合中。

正如 Jarek 所说,这有效:

def test_no_import_errors():
    dag_bag = DagBag(dag_folder="dags/", include_examples=False)
    assert len(dag_bag.import_errors) == 0, "No Import Failures"

这是我用来测试的例子:

# python -m unittest test_dag_validation.py 
import unittest
import logging
from airflow.models import DagBag


class TestDAGValidation(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        log = logging.getLogger()
        handler = logging.FileHandler("dag_validation.log", mode="w")

        handler.setLevel(logging.INFO)
        log.addHandler(handler)
        cls.log = log

    def test_no_import_errors(self):
        dag_bag = DagBag(dag_folder="dags/", include_examples=False)
        self.log.info(f"How Many DAGs?: {dag_bag.size()}")
        self.log.info(f"Import errors: {len(dag_bag.import_errors)}")
        assert len(dag_bag.import_errors) == 0, "No Import Failures"


默认情况下,airflow DagBag 在 AIRFLOW_HOME/dags 文件夹中查找 dag。

这通常存储在 airflow.cfg 文件中。 默认情况下它指向 ~/airflow 文件夹,但你可以通过 运行 -

指向当前工作目录
export $AIRFLOW_HOME=abs_path_of_your_folder

如果您使用 python 安装 Airflow,确保先导出 $AIRFLOW_HOME 变量,然后激活虚拟环境,最后安装 Airflow。这将确保您的路径正确附加到 airflow.cfg 文件。

您还可以检查您的文件夹是否正确加载,同时 运行 单元测试。在终端中,文件路径打印为

[2022-02-03 20:45:57,657] {dagbag.py:500} INFO - Filling up the DagBag from /Users/kehsihba19/Desktop/airflow-test/dags

用于检查 DAG 中导入错误的示例文件,其中包括检查拼写错误和循环任务检查 -

from airflow.models import DagBag
import unittest

class TestDags(unittest.TestCase):
    def test_DagBag(self):
        self.dag_bag = DagBag(include_examples=False)
        self.assertFalse(bool(self.dag_bag.import_errors))

if __name__ == "__main__":
    unittest.main()