Как проверить, что asyncio.Queue НЕ получил что-то толкнул
В настоящее время я пишу некоторые асинхронные тесты с pytest и столкнулся со следующей ситуацией.
Считайте, что у нас есть asyncio.Queue
называется peer2_subscriber
что мы хотим проверить, получило ли оно определенное сообщение (после запуска какого-либо действия опущено для краткости)
peer, cmd, msg = await asyncio.wait_for(
peer2_subscriber.get(),
timeout=1,
)
assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash
Теперь посмотрим, что я хочу проверить, что другой asyncio.Queue
НЕ что-то толкнуло.
Я обнаружил, что создаю такой вспомогательный метод.
async def wait_with_fallback(fn, fallback):
try:
return await asyncio.wait_for(
fn(),
timeout=1
)
except asyncio.TimeoutError:
return fallback
И тогда в тесте я пишу что-то вроде:
val = await wait_with_fallback(
peer1_subscriber.get,
None
)
assert val == None
Интересно, есть ли существующий шаблон, который я пропускаю?
1 ответ
Решение
Ваша модель работает, поэтому я бы сказал, что она "правильная", для определенных значений "правильная"… Это в основном стилистические взгляды. Я бы написал либо
await asyncio.sleep(1)
assert peer1_subscriber.empty()
или же
await asyncio.sleep(1)
val = peer1_subscriber.get_nowait()
assert val is None