Как проверить, что asyncio.Queue НЕ получил что-то толкнул

В настоящее время я пишу некоторые асинхронные тесты с pytest и столкнулся со следующей ситуацией.

Считайте, что у нас есть asyncio.Queue называется peer2_subscriber что мы хотим проверить, получило ли оно определенное сообщение (после запуска какого-либо действия опущено для краткости)

peer, cmd, msg = await asyncio.wait_for(
    peer2_subscriber.get(),
    timeout=1,
)

assert peer == peer2
assert isinstance(cmd, Transactions)
assert msg[0].hash == txs[0].hash

Теперь посмотрим, что я хочу проверить, что другой asyncio.Queue НЕ что-то толкнуло.

Я обнаружил, что создаю такой вспомогательный метод.

async def wait_with_fallback(fn, fallback):
    try:
        return await asyncio.wait_for(
            fn(),
            timeout=1
        )
    except asyncio.TimeoutError:
        return fallback

И тогда в тесте я пишу что-то вроде:

val = await wait_with_fallback(
    peer1_subscriber.get,
    None
)

assert val == None

Интересно, есть ли существующий шаблон, который я пропускаю?

1 ответ

Решение

Ваша модель работает, поэтому я бы сказал, что она "правильная", для определенных значений "правильная"… Это в основном стилистические взгляды. Я бы написал либо

await asyncio.sleep(1)
assert peer1_subscriber.empty()

или же

await asyncio.sleep(1)
val = peer1_subscriber.get_nowait()
assert val is None
Другие вопросы по тегам