Airflow - DAG Integrity Testing - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: variable

Airflow - DAG Integrity Testing - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: variable

我正在尝试在气流中编写一些 DAG 完整性测试。我遇到的问题是我正在测试的 DAG,我在该 DAG 中的某些任务中引用了变量。

例如:Variable.get("AIRFLOW_VAR_BLOB_CONTAINER")

我似乎遇到了错误: sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) 没有这样的 table: 变量

这是因为当通过 pytest 进行测试时,这些变量(和变量 table)不存在。在 运行 DAG 完整性测试时,有谁知道处理 Variables/Connection 引用的任何解决方法或建议方法?

谢谢,

您可以创建本地 Metastore 进行测试。 运行 airflow db init 没有任何其他设置将在您的主目录中创建一个 SQLite 元存储,您可以在测试期间使用它。我用于测试的本地 Metastore 的默认附加设置是:

AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False  (to ensure there are no defaults to make things magically work)
AIRFLOW__CORE__LOAD_EXAMPLES=False  (to ensure there are no defaults to make things magically work)
AIRFLOW__CORE__UNIT_TEST_MODE=True  (Set default test settings, skip certain actions, etc.)
AIRFLOW_HOME=[project root dir]  (To avoid Airflow files in your home dir)

运行 airflow db init 使用这些设置会在您的项目根目录中生成三个文件:

  1. unittests.db
  2. unittests.cfg
  3. webserver_config.py

将这些添加到您的 .gitignore 中可能是个好主意。通过此设置,您可以在测试期间针对本地 Metastore unittests.db 安全地进行测试(确保在 运行 pytest 时设置相同的环境变量)。

或者,如果您出于某种原因不想要本地元存储,您将不得不求助于模拟来替代 Airflow 对元存储进行的调用。这需要了解 Airflow 的内部结构。一个例子:

import datetime
from unittest import mock

from airflow.models import DAG
from airflow.operators.bash import BashOperator


def test_bash_operator(tmp_path):
    with DAG(dag_id="test_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@daily") as dag:
        with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
            employees = ["Alice", "Bob", "Charlie"]
            variable_get_mock.return_value = employees
            output_file = tmp_path / "output.txt"
            test = BashOperator(task_id="test", bash_command="echo {{ var.json.employees }} > " + str(output_file))
            dag.clear()
            test.run(
                start_date=dag.start_date,
                end_date=dag.start_date,
                ignore_first_depends_on_past=True,
                ignore_ti_state=True,
            )

            variable_get_mock.assert_called_once()
            assert output_file.read_text() == f"[{', '.join(employees)}]\n"

这些行:

with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
    employees = ["Alice", "Bob", "Charlie"]
    variable_get_mock.return_value = employees

确定实际上并未调用函数 airflow.models.variable.Variable.get,而是 return 编辑了此列表:["Alice", "Bob", "Charlie"]。由于 task.run() 没有 return 任何东西,我将 bash_command 写入 tmp_path,然后读取文件以断言内容是否符合我的预期。

这完全避免了对 Metastore 的需要,但是一旦您的测试超出了这些基本示例的范围,模拟可能会需要大量工作并且很复杂。