如何测试 asyncio.Queue 没有被推送
How to test that asyncio.Queue did NOT get something pushed
我目前正在用 pytest 编写一些异步测试,发现自己 运行 遇到以下情况。
假设我们有一个名为 peer2_subscriber
的 asyncio.Queue
,我们想检查它是否收到了特定消息(在触发某些操作后,为简洁起见省略)
peer, cmd, msg = await asyncio.wait_for(
peer2_subscriber.get(),
timeout=1,
)
assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash
现在,假设我想测试另一个 asyncio.Queue
没有推送任何东西。
我发现自己创建了这样一个辅助方法。
async def wait_with_fallback(fn, fallback):
try:
return await asyncio.wait_for(
fn(),
timeout=1
)
except asyncio.TimeoutError:
return fallback
然后在测试中我写了这样的东西:
val = await wait_with_fallback(
peer1_subscriber.get,
None
)
assert val == None
我想知道是否有我遗漏的现有模式?
您的模式有效,所以我会说它是 "correct",对于正确的某些值…这里主要是风格观点。我会写
await asyncio.sleep(1)
assert peer1_subscriber.empty()
或
await asyncio.sleep(1)
val = peer1_subscriber.get_nowait()
assert val is None
我目前正在用 pytest 编写一些异步测试,发现自己 运行 遇到以下情况。
假设我们有一个名为 peer2_subscriber
的 asyncio.Queue
,我们想检查它是否收到了特定消息(在触发某些操作后,为简洁起见省略)
peer, cmd, msg = await asyncio.wait_for(
peer2_subscriber.get(),
timeout=1,
)
assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash
现在,假设我想测试另一个 asyncio.Queue
没有推送任何东西。
我发现自己创建了这样一个辅助方法。
async def wait_with_fallback(fn, fallback):
try:
return await asyncio.wait_for(
fn(),
timeout=1
)
except asyncio.TimeoutError:
return fallback
然后在测试中我写了这样的东西:
val = await wait_with_fallback(
peer1_subscriber.get,
None
)
assert val == None
我想知道是否有我遗漏的现有模式?
您的模式有效,所以我会说它是 "correct",对于正确的某些值…这里主要是风格观点。我会写
await asyncio.sleep(1)
assert peer1_subscriber.empty()
或
await asyncio.sleep(1)
val = peer1_subscriber.get_nowait()
assert val is None