如何在不睡觉的情况下对这个协程进行单元测试
How to unit test this coroutine without sleeping
我正在尝试创建将以下功能添加到现有协程的函数。
- 根据参数为原始协程添加缓存。
- 添加ttl参数
默认为无穷大,因此调用者可以指定
数据应该在刷新之前。
- 如果有人用一些参数调用缓存协程,而原始协程没有返回相同参数的结果,那么第二个协程应该等待这个结果并从缓存中获取结果。
我在测试最后一个条件时遇到问题。
def cached(cache, locks, f):
@wraps(f)
async def wrapper(*args, ttl=float('inf')):
value, updated_at = cache.get(args, (None, None))
if value and updated_at >= time() - ttl:
return value
else:
loading_sync = locks.setdefault(args, Sync())
if loading_sync.flag:
await loading_sync.condition.wait()
return cache[args]
else:
with await loading_sync.condition:
loading_sync.flag = True
result = await f(*args)
cache[args] = result, time()
loading_sync.flag = False
loading_sync.condition.notify_all()
return result
return wrapper
要对这种情况进行单元测试,您可以使用可以随意解决的期货。在此处使用非常简化的 @cached
装饰器和函数:
@cached
async def test_mock(future):
await asyncio.wait_for(future, None)
func1_future = asyncio.Future()
func1_coro = test_mock(func1_future)
func2_coro = test_mock(...)
func1_future.set_result(True)
await func1_coro
await func2_coro
原答案,基于误解:
逻辑非常简单:你的缓存在某处,让我们使用一个简单的字典。当您第一次遇到特定参数时,您会在缓存位置创建一个 Future
。每当您访问缓存时,请检查您的值是否为 Future
,如果是,则为 await
。非常简单的插图:
cache = dict()
async def memoizer(args):
if args in cache:
cached_value = cache[args]
if isinstance(cached_value, asyncio.Future):
cached_value = await asyncio.wait_for(cached_value, None)
return cached_value
else:
future = asyncio.Future()
cache[args] = future
value = await compute_value(args)
future.set_result(value)
cache[args] = value
return value
我正在尝试创建将以下功能添加到现有协程的函数。
- 根据参数为原始协程添加缓存。
- 添加ttl参数 默认为无穷大,因此调用者可以指定 数据应该在刷新之前。
- 如果有人用一些参数调用缓存协程,而原始协程没有返回相同参数的结果,那么第二个协程应该等待这个结果并从缓存中获取结果。
我在测试最后一个条件时遇到问题。
def cached(cache, locks, f):
@wraps(f)
async def wrapper(*args, ttl=float('inf')):
value, updated_at = cache.get(args, (None, None))
if value and updated_at >= time() - ttl:
return value
else:
loading_sync = locks.setdefault(args, Sync())
if loading_sync.flag:
await loading_sync.condition.wait()
return cache[args]
else:
with await loading_sync.condition:
loading_sync.flag = True
result = await f(*args)
cache[args] = result, time()
loading_sync.flag = False
loading_sync.condition.notify_all()
return result
return wrapper
要对这种情况进行单元测试,您可以使用可以随意解决的期货。在此处使用非常简化的 @cached
装饰器和函数:
@cached
async def test_mock(future):
await asyncio.wait_for(future, None)
func1_future = asyncio.Future()
func1_coro = test_mock(func1_future)
func2_coro = test_mock(...)
func1_future.set_result(True)
await func1_coro
await func2_coro
原答案,基于误解:
逻辑非常简单:你的缓存在某处,让我们使用一个简单的字典。当您第一次遇到特定参数时,您会在缓存位置创建一个 Future
。每当您访问缓存时,请检查您的值是否为 Future
,如果是,则为 await
。非常简单的插图:
cache = dict()
async def memoizer(args):
if args in cache:
cached_value = cache[args]
if isinstance(cached_value, asyncio.Future):
cached_value = await asyncio.wait_for(cached_value, None)
return cached_value
else:
future = asyncio.Future()
cache[args] = future
value = await compute_value(args)
future.set_result(value)
cache[args] = value
return value