如何测试 asyncio.Queue 没有被推送

How to test that asyncio.Queue did NOT get something pushed

我目前正在用 pytest 编写一些异步测试,发现自己 运行 遇到以下情况。

假设我们有一个名为 peer2_subscriberasyncio.Queue,我们想检查它是否收到了特定消息(在触发某些操作后,为简洁起见省略)

peer, cmd, msg = await asyncio.wait_for(
    peer2_subscriber.get(),
    timeout=1,
)

assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash

现在,假设我想测试另一个 asyncio.Queue 没有推送任何东西。

我发现自己创建了这样一个辅助方法。

async def wait_with_fallback(fn, fallback):
    try:
        return await asyncio.wait_for(
            fn(),
            timeout=1
        )
    except asyncio.TimeoutError:
        return fallback

然后在测试中我写了这样的东西:

val = await wait_with_fallback(
    peer1_subscriber.get,
    None
)

assert val == None

我想知道是否有我遗漏的现有模式?

您的模式有效,所以我会说它是 "correct",对于正确的某些值…这里主要是风格观点。我会写

await asyncio.sleep(1)
assert peer1_subscriber.empty()

await asyncio.sleep(1)
val = peer1_subscriber.get_nowait()
assert val is None