Проблемы pytest с fxiture и asyncio с ограничением сеанса

У меня есть несколько тестовых файлов, каждый из которых имеет асинхронный инструмент, который выглядит следующим образом:


@pytest.fixture(scope="module")
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="module")
async def some_fixture():
    return await make_fixture()

Я использую xdist для распараллеливания. Вдобавок у меня есть этот декоратор:

@toolz.curry
def throttle(limit, f):
    semaphore = asyncio.Semaphore(limit)

    @functools.wraps(f)
    async def wrapped(*args, **kwargs):
        async with semaphore:
            return await f(*args, **kwargs)

    return wrapped

и у меня есть функция, которая его использует:

@throttle(10)
def f():
    ...

Сейчас f вызывается из нескольких тестовых файлов, и я получаю исключение, сообщающее мне, что я не могу использовать семафор из разных циклов событий.

Я попытался перейти к фиксации цикла событий на уровне сеанса:



@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

Но это дало мне только:

ScopeMismatch: вы пытались получить доступ к фикстуре с областью видимости "функция" event_loop с помощью объекта запроса с областью видимости "модуль", задействованные фабрики

Возможно ли, чтобы xdist + async fixture + semaphore работали вместе?

0 ответов

В конце концов он заработал, используя следующие conftest.py:

import asyncio

import pytest


@pytest.fixture(scope="session")
def event_loop():
    return asyncio.get_event_loop()
Другие вопросы по тегам