RuntimeError: цикл событий закрыт — мотор, асинхронность

Я не могу запустить этот тест, у меня всегда одна и та же ошибка RuntimeError: цикл событий закрыт

Что мне нужно добавить в этот код?

      from motor.motor_asyncio import AsyncIOMotorClient
import pytest
import asyncio

client = AsyncIOMotorClient("mongodb://mongo:mongo@192.168.0.11:27017/admin?retryWrites=false")
db = client['app']
aux = db['users']

async def create_user_db(a: dict):
    x = await aux.insert_one(a)
    return x

@pytest.mark.asyncio
async def test_create():
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(form)
    assert res != None

Это ошибка

1 ответ

В вашем примере вы открываете базу данных во время «импорта», но у нас все еще нет цикла событий. Цикл событий создается при запуске тестового примера.

Вы можете определить свою базу данных как приспособление и предоставить ее функциям тестирования, например:

      @pytest.fixture
def client():
    return AsyncIOMotorClient("mongodb://localhost:27017/")


@pytest.fixture
def db(client):
    return client['test']


@pytest.fixture
def collection(db):
    return db['test']


async def create_user_db(collection, a: dict):
    x = await collection.insert_one(a)
    return x



@pytest.mark.asyncio
async def test_create(collection):
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(collection, form)
    assert res != None
Другие вопросы по тегам