Как я могу отменить задание asyncio до окончания модульного теста?

Я пытаюсь провести модульное тестирование асинхронного сервера сокетов и использую pytest-asyncio сделать Pytest совместимым с базой асинхронного кода. После запуска сервер всегда готов отправить ответ через цикл while и, вероятно, проводит большую часть своего времени в ожидании входящего сообщения в client_loop(), Проблема в том, что нет способа отменить эту задачу до того, как среда модульного тестирования завершит цикл обработки событий, и будет выдано это предупреждение:

Задание было уничтожено, но оно находится на рассмотрении!

task: <ожидающая задача coro = wait_for= <ожидающая в будущем cb = [ ()]>>

Единственная задача, к которой у меня есть доступ, - это задача, созданная asyncio.create_task() что, похоже, не та же задача. Эта задача выглядит следующим образом:

task: <задача ожидает выполнения coro = >

Так зовет task.cancel(); await task.wait_cancelled() на эту задачу не оказывает никакого влияния.

Как можно написать этот модульный тест, чтобы правильно запускать и запускать сервер для каждого теста, а не обрезать задачи, которые все еще могут выполняться?

Вот пример:

test_server.py

import pytest
import asyncio

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    the_server.stop()

@pytest.mark.asyncio
async def test_connect(server):
    loop = asyncio.get_event_loop()
    reader, writer = await asyncio.open_connection('0.0.0.0', 8888, loop = loop)
    writer.write(b'something')
    await reader.read(100)
    writer.write(b'something else')
    await reader.read(100)
    assert 1

server.py

import asyncio

class Server():
    async def start(self):
        loop = asyncio.get_event_loop()
        coro = asyncio.start_server(self.new_client, '0.0.0.0', 8888, loop = loop)
        task = loop.create_task(coro)
        print('\n')
        print(task)
        self.server = await task

    def stop(self):
        self.server.close()

    async def new_client(self, reader, writer):
        await self.client_loop(reader, writer)

    async def client_loop(self, reader, writer):
        while True:
            await reader.read(100)
            writer.write(b'reply')

Если вы хотите запустить этот пример, просто запустите pip3 install pytest-asyncio и pytest может подобрать этот плагин.

2 ответа

Решение

asyncio.Server.stop() Метод не полностью останавливает сервер. Он просто перестает принимать новые подключения. Любые соединения, созданные до закрытия, продолжат выполняться до завершения.

Согласно документации (выделено мое):

Остановить подачу: закройте прослушивающие сокеты и установите для атрибута сокетов значение Нет.

Сокеты, которые представляют существующие входящие клиентские соединения, остаются открытыми.

Сервер закрыт асинхронно, используйте сопрограмму wait_closed(), чтобы дождаться закрытия сервера.

В этом примере все соединения отправляются на бесконечность client_loop метод.

Лучшее решение - создать коллекцию задач в new_client() ответственность за исполнение client_loop() логика вместо непосредственного ожидания метода. При таком подходе все открытые задачи могут быть чисто завершены в stop() метод.

Ты должен await self.server.wait_closed() после звонка self.server.close(),

Ваш прибор должен выглядеть следующим образом:

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    await the_server.stop()

И stop метод вашего Server должен выглядеть так:

    async def stop(self):
        self.server.close()
        await self.server.wait_closed()

Смотрите документацию для деталей.

Другие вопросы по тегам