Проблемы pytest с fxiture и asyncio с ограничением сеанса
У меня есть несколько тестовых файлов, каждый из которых имеет асинхронный инструмент, который выглядит следующим образом:
@pytest.fixture(scope="module")
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="module")
async def some_fixture():
return await make_fixture()
Я использую xdist для распараллеливания. Вдобавок у меня есть этот декоратор:
@toolz.curry
def throttle(limit, f):
semaphore = asyncio.Semaphore(limit)
@functools.wraps(f)
async def wrapped(*args, **kwargs):
async with semaphore:
return await f(*args, **kwargs)
return wrapped
и у меня есть функция, которая его использует:
@throttle(10)
def f():
...
Сейчас f
вызывается из нескольких тестовых файлов, и я получаю исключение, сообщающее мне, что я не могу использовать семафор из разных циклов событий.
Я попытался перейти к фиксации цикла событий на уровне сеанса:
@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
Но это дало мне только:
ScopeMismatch: вы пытались получить доступ к фикстуре с областью видимости "функция" event_loop с помощью объекта запроса с областью видимости "модуль", задействованные фабрики
Возможно ли, чтобы xdist + async fixture + semaphore работали вместе?
0 ответов
В конце концов он заработал, используя следующие
conftest.py
:
import asyncio
import pytest
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()