Получение "пул закрыт" после первого тестового примера приложения aiohttp
У меня странная проблема с тестированием приложения aiohttp.
Короче в проекте я использую:
- Джино с sqlalchemy. Я не использую никаких функций orm. Раньше использовал aiopg и проблема была такая же.
- aiohttp-pytest
Приложение является поставщиком API-интерфейса graphql. Я делаю простой запрос списка пользователей.
Все работает нормально, когда я тестирую API вручную или с помощью любого другого инструмента, не использующего pytest, unittest и т. Д. Мне не удалось воспроизвести проблему.
Но как только я начинаю запускать тесты, первый проходит успешно, и все после перерывов и ответа возвращает ошибку: "пул закрыт". Сначала я использовал aiopg и обнаружил проблемы, похожие на мои, решил попробовать другой инструмент и получил ту же проблему.
Важная деталь, думаю, стоит упомянуть. Я не доверяю двигательapp['db']
Я написал класс для обработки соединений и разделения баз данных для записи и чтения.
Вот как это выглядит:
class DatabaseHandler:
"""Keeps track of databases used in app"""
__write_db_list__: List[Engine] = []
__read_db_list__: List[Engine] = []
def __init__(self,
write_dbs: Union[Iterable[Engine], None] = None,
read_dbs: Union[Iterable[Engine], None] = None) \
-> None:
"""Constructor. Accepts list of write and read database engines"""
super().__init__()
if write_dbs:
for url in write_dbs:
self.add_write_db(url)
if read_dbs:
for url in read_dbs:
self.add_read_db(url)
@staticmethod
async def url_to_engine(url):
"""Builds db engine from url"""
return await gino.create_engine(url)
@staticmethod
async def build(write_urls: Iterable[str], read_urls: Iterable[str]):
return DatabaseHandler(
[await DatabaseHandler.url_to_engine(u) for u in write_urls],
[await DatabaseHandler.url_to_engine(u) for u in read_urls]
)
def add_read_db(self, engine: Engine):
"""Adds db engine to read list"""
self.__read_db_list__.append(engine)
def add_write_db(self, engine: Engine):
"""Adds db engine to write list"""
self.__write_db_list__.append(engine)
def get_for_read(self) -> Union[Engine, None]:
"""Returns engine for reading requests (SELECT)"""
if len(self.__read_db_list__) > 0:
result = self.__read_db_list__[0]
else:
result = self.get_for_write()
return result
def get_for_write(self) -> Union[Engine, None]:
"""Returns engine for writing requests (INSERT, UPDATE)"""
result = None
if len(self.__write_db_list__) > 0:
result = self.__write_db_list__[0]
return result
async def disconnect(self) -> None:
"""Method to disconect from db"""
read_db_engines: List[Engine] = self.__read_db_list__
for db_engine in read_db_engines:
await db_engine.close()
write_db_engine: List[Engine] = self.__write_db_list__
for db_engine in write_db_engine:
await db_engine.close()
и инициализация db в функции запуска:
async def init_db(app: web.Application) -> None:
"""Initialize db connections"""
app['db'] = await DatabaseHandler.build(
[app.get('config', {}).get('WRITE_DB')],
[app.get('config', {}).get('READ_DB')]
)
Как выглядит тест:
class UserListTestCase(AioHTTPTestCase):
async def get_application(self) -> web.Application:
return init_app()
@unittest_run_loop
async def test_get_all_users(self):
client: TestClient = self.client
resp: ClientResponse = await client.post(
ADMIN_BASE_URI,
json=await build_gql_request(USERS_LIST, {
"query": ""
})
)
assert resp.status == 200
raise Exception(await resp.json())
data = (await resp.json()).get('data', {}).get('users', {})
assert data.get('count') == 7
assert len(data.get('result')) == 7
resp.close()
@unittest_run_loop
async def test_get_users_by_status(self):
client: TestClient = self.client
resp: ClientResponse = await client.post(
ADMIN_BASE_URI,
json=await build_gql_request(USERS_LIST, {
"query": "",
"isActive": True
})
)
assert resp.status == 200
raise Exception(await resp.json())
data = (await resp.json()).get('data', {}).get('user', {})
assert data.get('count') == 4
assert len(data.get('result')) == 4
assert set([u.get('id') for u in data.get('result', [])]) \
== {1, 2, 5, 7}
resp: ClientResponse = await client.post(
ADMIN_BASE_URI,
json=await build_gql_request(USERS_LIST, {
"query": "",
"is_active": False
})
)
assert resp.status == 200
data = (await resp.json()).get('data', {}).get('user', {})
assert data.get('count') == 3
assert len(data.get('result')) == 3
assert [u.get('id') for u in data.get('result', [])] == [3, 4, 6]
Я попробовал провести тест через функции, результат был тот же. Какой бы первый тест ни был запущен, он проходит успешно, а все последующие тесты завершаются с ошибкой "пул закрыт".
Я хотел бы понять, почему это происходит, и есть ли вероятность возникновения такой же проблемы при производстве.
Я думаю, что мне что-то не хватает в том, как работают фреймворки на основе asyncio и asyncio.
Буду рад любой помощи.