Получение "пул закрыт" после первого тестового примера приложения aiohttp

У меня странная проблема с тестированием приложения aiohttp.

Короче в проекте я использую:

Приложение является поставщиком API-интерфейса graphql. Я делаю простой запрос списка пользователей.

Все работает нормально, когда я тестирую API вручную или с помощью любого другого инструмента, не использующего pytest, unittest и т. Д. Мне не удалось воспроизвести проблему.

Но как только я начинаю запускать тесты, первый проходит успешно, и все после перерывов и ответа возвращает ошибку: "пул закрыт". Сначала я использовал aiopg и обнаружил проблемы, похожие на мои, решил попробовать другой инструмент и получил ту же проблему.

Важная деталь, думаю, стоит упомянуть. Я не доверяю двигательapp['db'] Я написал класс для обработки соединений и разделения баз данных для записи и чтения.

Вот как это выглядит:

class DatabaseHandler:
    """Keeps track of databases used in app"""
    __write_db_list__: List[Engine] = []
    __read_db_list__: List[Engine] = []

    def __init__(self,
                 write_dbs: Union[Iterable[Engine], None] = None,
                 read_dbs: Union[Iterable[Engine], None] = None) \
            -> None:
        """Constructor. Accepts list of write and read database engines"""
        super().__init__()

        if write_dbs:
            for url in write_dbs:
                self.add_write_db(url)

        if read_dbs:
            for url in read_dbs:
                self.add_read_db(url)

    @staticmethod
    async def url_to_engine(url):
        """Builds db engine from url"""
        return await gino.create_engine(url)

    @staticmethod
    async def build(write_urls: Iterable[str], read_urls: Iterable[str]):
        return DatabaseHandler(
            [await DatabaseHandler.url_to_engine(u) for u in write_urls],
            [await DatabaseHandler.url_to_engine(u) for u in read_urls]
        )

    def add_read_db(self, engine: Engine):
        """Adds db engine to read list"""
        self.__read_db_list__.append(engine)

    def add_write_db(self, engine: Engine):
        """Adds db engine to write list"""
        self.__write_db_list__.append(engine)

    def get_for_read(self) -> Union[Engine, None]:
        """Returns engine for reading requests (SELECT)"""

        if len(self.__read_db_list__) > 0:
            result = self.__read_db_list__[0]
        else:
            result = self.get_for_write()

        return result

    def get_for_write(self) -> Union[Engine, None]:
        """Returns engine for writing requests (INSERT, UPDATE)"""
        result = None

        if len(self.__write_db_list__) > 0:
            result = self.__write_db_list__[0]

        return result

    async def disconnect(self) -> None:
        """Method to disconect from db"""
        read_db_engines: List[Engine] = self.__read_db_list__
        for db_engine in read_db_engines:
            await db_engine.close()

        write_db_engine: List[Engine] = self.__write_db_list__
        for db_engine in write_db_engine:
            await db_engine.close()

и инициализация db в функции запуска:

async def init_db(app: web.Application) -> None:
    """Initialize db connections"""
    app['db'] = await DatabaseHandler.build(
        [app.get('config', {}).get('WRITE_DB')],
        [app.get('config', {}).get('READ_DB')]
    )

Как выглядит тест:

class UserListTestCase(AioHTTPTestCase):
    async def get_application(self) -> web.Application:
        return init_app()

    @unittest_run_loop
    async def test_get_all_users(self):
        client: TestClient = self.client
        resp: ClientResponse = await client.post(
            ADMIN_BASE_URI,
            json=await build_gql_request(USERS_LIST, {
                "query": ""
            })
        )

        assert resp.status == 200
        raise Exception(await resp.json())
        data = (await resp.json()).get('data', {}).get('users', {})
        assert data.get('count') == 7
        assert len(data.get('result')) == 7

        resp.close()

    @unittest_run_loop
    async def test_get_users_by_status(self):
        client: TestClient = self.client
        resp: ClientResponse = await client.post(
            ADMIN_BASE_URI,
            json=await build_gql_request(USERS_LIST, {
                "query": "",
                "isActive": True
            })
        )

        assert resp.status == 200
        raise Exception(await resp.json())
        data = (await resp.json()).get('data', {}).get('user', {})
        assert data.get('count') == 4
        assert len(data.get('result')) == 4

        assert set([u.get('id') for u in data.get('result', [])]) \
               == {1, 2, 5, 7}

        resp: ClientResponse = await client.post(
            ADMIN_BASE_URI,
            json=await build_gql_request(USERS_LIST, {
                "query": "",
                "is_active": False
            })
        )

        assert resp.status == 200
        data = (await resp.json()).get('data', {}).get('user', {})
        assert data.get('count') == 3
        assert len(data.get('result')) == 3
        assert [u.get('id') for u in data.get('result', [])] == [3, 4, 6]

Я попробовал провести тест через функции, результат был тот же. Какой бы первый тест ни был запущен, он проходит успешно, а все последующие тесты завершаются с ошибкой "пул закрыт".

Я хотел бы понять, почему это происходит, и есть ли вероятность возникновения такой же проблемы при производстве.

Я думаю, что мне что-то не хватает в том, как работают фреймворки на основе asyncio и asyncio.

Буду рад любой помощи.

0 ответов

Другие вопросы по тегам