来自 Redis 单元测试的模拟数据 python
Mock Data from Redis unit testing python
我有一个class
class ReferenceData(celery.Task):
def __init__(self):
redis_pool = redis.ConnectionPool()
self.redis = redis.StrictRedis(connection_pool=redis_pool)
def get_agents(self,id):
if condition:
return self.agents.get(id)
return self.agents.get(external_id
还有 celery 任务调用了下面的方法
def save_raisr_event(self, event):
# Get attributes:
transcriptions = []
if trans is not None:
transcriptions.append(trans)
self.redis.append(f"{crn}-transcription", ' ' + transcription)
return 'transcription saved'
在单元测试中如何模拟 redis 连接和来自 redis 的模拟数据,在这种情况下,值 pf 转录保存在 redis 中
假设这是源代码:
src.py
import celery
import redis
class ReferenceData(celery.Task):
def __init__(self):
redis_pool = redis.ConnectionPool()
self.redis = redis.StrictRedis(connection_pool=redis_pool)
def save_raisr_event(self, event):
self.redis.append("crn", 'transcription')
return self.redis.keys()
如果您只想用静态值修补 redis
中方法的响应:
test_src.py
from src import ReferenceData
def test_redis(mocker): # Requires <pip install pytest-mock>
mock_redis_pkg = mocker.patch("src.redis")
mock_redis = mock_redis_pkg.StrictRedis.return_value
mocked_keys = ["some", "transactions", "here"]
mock_redis.keys.return_value = mocked_keys
assert ReferenceData().save_raisr_event(None) == mocked_keys
如果您想存储附加的实际交易,则设置一个 dict
将被更新:
test_src.py
from src import ReferenceData
def test_redis(mocker): # Requires <pip install pytest-mock>
mock_redis_pkg = mocker.patch("src.redis")
mock_redis = mock_redis_pkg.StrictRedis.return_value
storage = {}
mock_redis.append.side_effect = lambda key, value: storage.update({key: value})
mock_redis.keys.side_effect = lambda: storage.keys()
assert ReferenceData().save_raisr_event(None) == storage.keys()
assert storage == {'crn': 'transcription'}
我有一个class
class ReferenceData(celery.Task):
def __init__(self):
redis_pool = redis.ConnectionPool()
self.redis = redis.StrictRedis(connection_pool=redis_pool)
def get_agents(self,id):
if condition:
return self.agents.get(id)
return self.agents.get(external_id
还有 celery 任务调用了下面的方法
def save_raisr_event(self, event):
# Get attributes:
transcriptions = []
if trans is not None:
transcriptions.append(trans)
self.redis.append(f"{crn}-transcription", ' ' + transcription)
return 'transcription saved'
在单元测试中如何模拟 redis 连接和来自 redis 的模拟数据,在这种情况下,值 pf 转录保存在 redis 中
假设这是源代码:
src.py
import celery
import redis
class ReferenceData(celery.Task):
def __init__(self):
redis_pool = redis.ConnectionPool()
self.redis = redis.StrictRedis(connection_pool=redis_pool)
def save_raisr_event(self, event):
self.redis.append("crn", 'transcription')
return self.redis.keys()
如果您只想用静态值修补 redis
中方法的响应:
test_src.py
from src import ReferenceData
def test_redis(mocker): # Requires <pip install pytest-mock>
mock_redis_pkg = mocker.patch("src.redis")
mock_redis = mock_redis_pkg.StrictRedis.return_value
mocked_keys = ["some", "transactions", "here"]
mock_redis.keys.return_value = mocked_keys
assert ReferenceData().save_raisr_event(None) == mocked_keys
如果您想存储附加的实际交易,则设置一个 dict
将被更新:
test_src.py
from src import ReferenceData
def test_redis(mocker): # Requires <pip install pytest-mock>
mock_redis_pkg = mocker.patch("src.redis")
mock_redis = mock_redis_pkg.StrictRedis.return_value
storage = {}
mock_redis.append.side_effect = lambda key, value: storage.update({key: value})
mock_redis.keys.side_effect = lambda: storage.keys()
assert ReferenceData().save_raisr_event(None) == storage.keys()
assert storage == {'crn': 'transcription'}