气流 - Pytest - 未找到夹具 'dag'
Airflow - Pytest - fixture 'dag' not found
我正在尝试为气流中的 DAG 编写一些完整性测试。我目前正在使用 airflow.utils.dag_cycle_tester 中的 test_cycle 函数测试循环 DAG。我的代码如下:
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
from airflow.utils.dag_cycle_tester import test_cycle
DAG_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
print(DAG_FILES)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name,
module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var,
DAG)]
assert dag_objects
for dag in dag_objects:
test_cycle(dag)
出于某种原因,当我 运行 使用 pytest tests/ 时,出现以下错误:
___________ERROR 在 test_cycle ___________
的设置
def test_cycle(dag):
未找到 E 夹具 'dag'
可用装置:anyio_backend、anyio_backend_name、anyio_backend_options、缓存、capfd、capfdbinary、caplog、capsys、capsysbinary、celery_config、celery_enable_logging, cov, doctest_namespace, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, tmp_path, tmp_path_工厂,tmpdir,tmpdir_factory
使用 'pytest --fixtures [testpath]' 寻求帮助。
任何关于导致这种情况的想法将不胜感激。谢谢!
虽然失败实际上是在您显示的另一个函数中,但我可以解释发生了什么。
问题:
Pytest 选择以 test_
开头的每个函数。因此,当您的函数 test_cycle
以 test_
开始时,它也将 运行 作为单独的测试。在单独的测试中,它试图找到不存在的夹具 dag
。
因此解决方案是:
将函数重命名为 cycle_dag
应该可以修复它:)
我正在尝试为气流中的 DAG 编写一些完整性测试。我目前正在使用 airflow.utils.dag_cycle_tester 中的 test_cycle 函数测试循环 DAG。我的代码如下:
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
from airflow.utils.dag_cycle_tester import test_cycle
DAG_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
print(DAG_FILES)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name,
module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var,
DAG)]
assert dag_objects
for dag in dag_objects:
test_cycle(dag)
出于某种原因,当我 运行 使用 pytest tests/ 时,出现以下错误:
___________ERROR 在 test_cycle ___________
的设置def test_cycle(dag):
未找到 E 夹具 'dag'
可用装置:anyio_backend、anyio_backend_name、anyio_backend_options、缓存、capfd、capfdbinary、caplog、capsys、capsysbinary、celery_config、celery_enable_logging, cov, doctest_namespace, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, tmp_path, tmp_path_工厂,tmpdir,tmpdir_factory 使用 'pytest --fixtures [testpath]' 寻求帮助。
任何关于导致这种情况的想法将不胜感激。谢谢!
虽然失败实际上是在您显示的另一个函数中,但我可以解释发生了什么。
问题:
Pytest 选择以 test_
开头的每个函数。因此,当您的函数 test_cycle
以 test_
开始时,它也将 运行 作为单独的测试。在单独的测试中,它试图找到不存在的夹具 dag
。
因此解决方案是:
将函数重命名为 cycle_dag
应该可以修复它:)