Тестирование сервера aiohttp с помощью асинхронных устройств

Я пытаюсь использовать pytest-aiohttp проверить мой aiohttpна основе REST API. Существует конечная точка / Authenticate, которая возвращает токен аутентификации, и, по сути, все другие конечные точки требуют токен в заголовке авторизации. У меня есть следующие тесты, которые отлично работают:

from pytest import fixture

from agora_backend.server import get_app

NAME, PASSWORD = "clin123", "password123"


@fixture
def client(loop, aiohttp_client):
    return loop.run_until_complete(aiohttp_client(get_app))


async def test_patient_success(client):
    auth_response = await client.post("/authenticate",
        json={"user_name": NAME, "user_pass": PASSWORD})

    auth_token =  await auth_response.text()

    response = await client.get("/clinician/patients",
        headers={"Authorization": f"Bearer {auth_token}"})

    assert response.status == 200


async def test_session_id_success(client):
    auth_response = await client.post("/authenticate",
        json={"user_name": NAME, "user_pass": PASSWORD})

    auth_token =  await auth_response.text()

    response = await client.get("/clinician/session/1",
        headers={"Authorization": f"Bearer {auth_token}"})

    assert response.status == 200

Однако это не очень СУХОЕ. Я хотел бы использовать один и тот же токен авторизации для всех тестов, поэтому решил, что хорошим выбором будет приспособление. Итак, я попробовал следующий код:

from pytest import fixture

from agora_backend.server import get_app

NAME, PASSWORD = "clin123", "password123"


@fixture
def client(loop, aiohttp_client):
    return loop.run_until_complete(aiohttp_client(get_app))


@fixture
async def auth_token(client):
    auth_response = await client.post("/authenticate",
        json={"user_name": NAME, "user_pass": PASSWORD})

    return await auth_response.text()


async def test_patient_success(client, auth_token):
    response = await client.get("/clinician/patients",
        headers={"Authorization": f"Bearer {auth_token}"})

    assert response.status == 200


async def test_session_id_success(client, auth_token):
    response = await client.get("/clinician/session/1",
        headers={"Authorization": f"Bearer {auth_token}"})

    assert response.status == 200

Однако это приводит к исключению RuntimeError: Timeout context manager should be used inside a task. Я попытался посмотреть на этот, казалось бы, связанный ответ, но не уверен, полностью ли он применим к моей ситуации. И если это так, я не совсем понимаю, как применить это к моему коду. Любая помощь будет оценена по достоинству!

0 ответов

Другие вопросы по тегам