在气流中模拟 Postgres 响应
Mocking Postgres Response in Airflow
我试图模拟气流中的 postgres 响应,但我做不到。尝试了在网上找到的所有解决方案,但没有帮助。在气流手册上,它只提供了使用 docker 图像数据库的解决方案。
代码如下:
from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook
def process(pg_conn_id: str, ti: TaskInstance, **kwargs) -> str:
xcom_in = ti.xcom_pull(task_ids="ids")
pg_conn = PostgresHook(pg_conn_id).get_conn()
cursor = pg_conn.cursor()
cursor.execute("SELECT * FROM stub")
result = cursor.fetchone()[0]
print(result)
return result
这是尝试过的测试之一:
from unittest import mock
import pytest
from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook
def test_process(mocker):
input = "The input data"
output="The value in DB"
ti = TaskInstance
ti.xcom_pull = MagicMock(return_value=input)
mocker.patch.object(
PostgresHook,
"get_conn",
return_value = MagicMock(name="pg")
)
assert output == process("mock_db", ti))
问题是结果,而不是我收到的“输入”值
<MagicMock name='pg.cursor().fetchone().__getitem__()' id='2259571508176'>
模拟就是这样工作的——它们只是记录对它们的实际调用。然后是 concatenation/chaining 调用,如果您在返回的模拟上调用方法,它将连接到原始模拟(下一段中有更多介绍)。
您已成功将 pg_conn = PostgresHook(pg_conn_id).get_conn()
中的 pg_conn
设为万智牌模拟。然后你有这条线 cursor = pg_conn.cursor()
所以 cursor
等于 pg.cursor()
。然后你得到等于 cursor.fetchone()[0]
的结果,我们将其连接到之前的模拟以获得 pg.cursor().fetchone().__getitem__()
- 因为 [0]
在幕后调用 __getitem__
。
所以在你的情况下,你可以有这个代码:
pg_mock = MagicMock(name="pg")
pg_mock.cursor().fetchone().__getitem__.return_value = "input"
mocker.patch.object(
PostgresHook,
"get_conn",
return_value = pg_mock
)
关于 mock 语法的最后一点说明:您通常可以将 .return_value
替换为 ()
,因此 pg_mock.cursor().fetchone().__getitem__.return_value = "input"
与 pg_mock.cursor.return_value.fetchone.return_value.__getitem__.return_value = "input"
相同。
我试图模拟气流中的 postgres 响应,但我做不到。尝试了在网上找到的所有解决方案,但没有帮助。在气流手册上,它只提供了使用 docker 图像数据库的解决方案。
代码如下:
from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook
def process(pg_conn_id: str, ti: TaskInstance, **kwargs) -> str:
xcom_in = ti.xcom_pull(task_ids="ids")
pg_conn = PostgresHook(pg_conn_id).get_conn()
cursor = pg_conn.cursor()
cursor.execute("SELECT * FROM stub")
result = cursor.fetchone()[0]
print(result)
return result
这是尝试过的测试之一:
from unittest import mock
import pytest
from airflow.models import TaskInstance
from airflow.providers.postgres.hooks.postgres import PostgresHook
def test_process(mocker):
input = "The input data"
output="The value in DB"
ti = TaskInstance
ti.xcom_pull = MagicMock(return_value=input)
mocker.patch.object(
PostgresHook,
"get_conn",
return_value = MagicMock(name="pg")
)
assert output == process("mock_db", ti))
问题是结果,而不是我收到的“输入”值
<MagicMock name='pg.cursor().fetchone().__getitem__()' id='2259571508176'>
模拟就是这样工作的——它们只是记录对它们的实际调用。然后是 concatenation/chaining 调用,如果您在返回的模拟上调用方法,它将连接到原始模拟(下一段中有更多介绍)。
您已成功将 pg_conn = PostgresHook(pg_conn_id).get_conn()
中的 pg_conn
设为万智牌模拟。然后你有这条线 cursor = pg_conn.cursor()
所以 cursor
等于 pg.cursor()
。然后你得到等于 cursor.fetchone()[0]
的结果,我们将其连接到之前的模拟以获得 pg.cursor().fetchone().__getitem__()
- 因为 [0]
在幕后调用 __getitem__
。
所以在你的情况下,你可以有这个代码:
pg_mock = MagicMock(name="pg")
pg_mock.cursor().fetchone().__getitem__.return_value = "input"
mocker.patch.object(
PostgresHook,
"get_conn",
return_value = pg_mock
)
关于 mock 语法的最后一点说明:您通常可以将 .return_value
替换为 ()
,因此 pg_mock.cursor().fetchone().__getitem__.return_value = "input"
与 pg_mock.cursor.return_value.fetchone.return_value.__getitem__.return_value = "input"
相同。