在气流中模拟 Postgres 响应

Mocking Postgres Response in Airflow

我试图模拟气流中的 postgres 响应,但我做不到。尝试了在网上找到的所有解决方案,但没有帮助。在气流手册上,它只提供了使用 docker 图像数据库的解决方案。

代码如下:

from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook

def process(pg_conn_id: str, ti: TaskInstance, **kwargs) -> str:

    xcom_in = ti.xcom_pull(task_ids="ids")

    pg_conn = PostgresHook(pg_conn_id).get_conn()
    cursor = pg_conn.cursor()
    cursor.execute("SELECT * FROM stub")
    result = cursor.fetchone()[0]
    print(result)

    return result

这是尝试过的测试之一:

from unittest import mock

import pytest
from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook

def test_process(mocker):
    input = "The input data"
    output="The value in DB"
    ti = TaskInstance
    ti.xcom_pull = MagicMock(return_value=input)


    mocker.patch.object(
        PostgresHook,
        "get_conn",
        return_value = MagicMock(name="pg")
    )

    assert output == process("mock_db", ti))

问题是结果,而不是我收到的“输入”值

<MagicMock name='pg.cursor().fetchone().__getitem__()' id='2259571508176'>

模拟就是这样工作的——它们只是记录对它们的实际调用。然后是 concatenation/chaining 调用,如果您在返回的模拟上调用方法,它将连接到原始模拟(下一段中有更多介绍)。

您已成功将 pg_conn = PostgresHook(pg_conn_id).get_conn() 中的 pg_conn 设为万智牌模拟。然后你有这条线 cursor = pg_conn.cursor() 所以 cursor 等于 pg.cursor()。然后你得到等于 cursor.fetchone()[0] 的结果,我们将其连接到之前的模拟以获得 pg.cursor().fetchone().__getitem__() - 因为 [0] 在幕后调用 __getitem__

所以在你的情况下,你可以有这个代码:

pg_mock = MagicMock(name="pg")
pg_mock.cursor().fetchone().__getitem__.return_value = "input"

mocker.patch.object(
        PostgresHook,
        "get_conn",
        return_value = pg_mock
    )

关于 mock 语法的最后一点说明:您通常可以将 .return_value 替换为 (),因此 pg_mock.cursor().fetchone().__getitem__.return_value = "input"pg_mock.cursor.return_value.fetchone.return_value.__getitem__.return_value = "input" 相同。