RuntimeError: цикл событий закрыт — мотор, асинхронность
Я не могу запустить этот тест, у меня всегда одна и та же ошибка RuntimeError: цикл событий закрыт
Что мне нужно добавить в этот код?
from motor.motor_asyncio import AsyncIOMotorClient
import pytest
import asyncio
client = AsyncIOMotorClient("mongodb://mongo:mongo@192.168.0.11:27017/admin?retryWrites=false")
db = client['app']
aux = db['users']
async def create_user_db(a: dict):
x = await aux.insert_one(a)
return x
@pytest.mark.asyncio
async def test_create():
form = {'username': 'c3', 'password': 'c3'}
res = await create_user_db(form)
assert res != None
Это ошибка
1 ответ
В вашем примере вы открываете базу данных во время «импорта», но у нас все еще нет цикла событий. Цикл событий создается при запуске тестового примера.
Вы можете определить свою базу данных как приспособление и предоставить ее функциям тестирования, например:
@pytest.fixture
def client():
return AsyncIOMotorClient("mongodb://localhost:27017/")
@pytest.fixture
def db(client):
return client['test']
@pytest.fixture
def collection(db):
return db['test']
async def create_user_db(collection, a: dict):
x = await collection.insert_one(a)
return x
@pytest.mark.asyncio
async def test_create(collection):
form = {'username': 'c3', 'password': 'c3'}
res = await create_user_db(collection, form)
assert res != None