Попытка реализовать 2 "потока" с помощью модуля `asyncio`

Я поиграл с потоками раньше в Python, но решил дать asyncio Попробуйте модуль, тем более что вы можете отменить запущенное задание, которое показалось приятной деталью. Однако по какой-то причине я не могу обернуть голову вокруг этого.

Вот что я хотел реализовать (извините, если я использую неверную терминологию):

  • downloader Поток, который загружает один и тот же файл каждые x секунд, проверяет его хеш относительно предыдущей загрузки и сохраняет его, если он отличается.
  • webserver поток, который работает в фоновом режиме, позволяя контролировать (пауза, список, остановка) downloader нить.

я использовал aiohttp для веб-сервера.

Это то, что я до сих пор:

class aiotest():

    def __init__(self):
        self._dl = None     # downloader future
        self._webapp = None # web server future
        self.init_server()

    def init_server(self):

        print('Setting up web interface')
        app = web.Application()
        app.router.add_route('GET', '/stop', self.stop)
        print('added urls')
        self._webapp = app

    @asyncio.coroutine
    def _downloader(self):
        while True:
            try:
                print('Downloading and verifying file...')
                # Dummy sleep - to be replaced by actual code
                yield from asyncio.sleep(random.randint(3,10))
                # Wait a predefined nr of seconds between downloads
                yield from asyncio.sleep(30)
            except asyncio.CancelledError:
                break

    @asyncio.coroutine
    def _supervisor(self):

        print('Starting downloader')
        self._dl = asyncio.async(self._downloader())

    def start(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._supervisor())
        loop.close()

    @asyncio.coroutine
    def stop(self):
        print('Received STOP')
        self._dl.cancel()
        return web.Response(body=b"Stopping... ")

Этот класс вызывается:

t = aiotest()
t.start()

Это не работает, конечно, и я чувствую, что это ужасный кусок кода.

Что мне неясно:

  • Я останавливаю downloader в stop() метод, но как бы я остановил веб-сервер (например, в shutdown() метод)?
  • Ли downloader нужен новый цикл событий, или я могу использовать цикл, возвращенный asyncio.get_event_loop()?
  • Мне действительно нужно что-то вроде supervisor для чего я пытаюсь реализовать? Это кажется таким неуклюжим. И как я могу получить supervisor продолжать работать вместо завершения после одного выполнения, как сейчас?

Последний, более общий вопрос: asyncio должен заменить threading модуль (в будущем)? Или у каждого свое приложение?

Я ценю все указатели, замечания и разъяснения!

1 ответ

Решение

Почему текущий код не работает:

  • Вы запускаете цикл обработки событий до self._supervisor() завершено. self._supervisor() создает задачу (это происходит сразу) и сразу же завершается.

  • Вы пытаетесь запустить цикл обработки событий, пока _supervisor завершено, но как и когда вы собираетесь запустить сервер? Я думаю, что цикл событий должен работать, пока сервер не остановился. _supervisor или другие вещи могут быть добавлены в качестве задачи (в том же цикле событий). aiohttp уже есть функция для запуска сервера и цикла событий - web.run_app, но мы можем сделать это вручную.

Ваши вопросы:

  1. Ваш сервер будет работать, пока вы не остановите его. Вы можете запускать / останавливать различные сопрограммы во время работы вашего сервера.

  2. Вам нужен только один цикл событий для разных сопрограмм.

  3. Я думаю тебе не нужно supervisor,

  4. Более общий вопрос: asyncio помогает вам запускать различные функции параллельно в одном потоке в одном процессе. Вот почему Asyncio такой крутой и быстрый. Часть вашего кода синхронизации с потоками вы можете переписать с помощью asyncio и его сопрограмм. Более того: asyncio может взаимодействовать с потоками и процессами. Это может быть полезно в том случае, если вам все еще нужны потоки и процессы: вот пример.

Полезные заметки:

  • Лучше использовать термин coroutine вместо thread в то время как мы говорим о сопрограмм Asyncio, которые не являются потоками
  • Если вы используете Python 3.5, вы можете использовать async/await синтаксис вместо coroutine/yield from

Я переписал ваш код, чтобы показать вам идею. Как это проверить: запустить программу, увидеть консоль, открыть http://localhost:8080/stopсмотрите консоль, открывайте http://localhost:8080/startсм. консоль, наберите CTRL+C.

import asyncio
import random
from contextlib import suppress

from aiohttp import web


class aiotest():
    def __init__(self):
        self._webapp = None
        self._d_task = None
        self.init_server()

    # SERVER:
    def init_server(self):
        app = web.Application()
        app.router.add_route('GET', '/start', self.start)
        app.router.add_route('GET', '/stop', self.stop)
        app.router.add_route('GET', '/kill_server', self.kill_server)
        self._webapp = app

    def run_server(self):
        # Create server:
        loop = asyncio.get_event_loop()
        handler = self._webapp.make_handler()
        f = loop.create_server(handler, '0.0.0.0', 8080)
        srv = loop.run_until_complete(f)
        try:
            # Start downloader at server start:
            asyncio.async(self.start(None))  # I'm using controllers here and below to be short,
                                             # but it's better to split controller and start func
            # Start server:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            # Stop downloader when server stopped:
            loop.run_until_complete(self.stop(None))
            # Cleanup resources:
            srv.close()
            loop.run_until_complete(srv.wait_closed())
            loop.run_until_complete(self._webapp.shutdown())
            loop.run_until_complete(handler.finish_connections(60.0))
            loop.run_until_complete(self._webapp.cleanup())
        loop.close()

    @asyncio.coroutine
    def kill_server(self, request):
        print('Server killing...')
        loop = asyncio.get_event_loop()
        loop.stop()
        return web.Response(body=b"Server killed")

    # DOWNLOADER
    @asyncio.coroutine
    def start(self, request):
        if self._d_task is None:
            print('Downloader starting...')
            self._d_task = asyncio.async(self._downloader())
            return web.Response(body=b"Downloader started")
        else:
            return web.Response(body=b"Downloader already started")

    @asyncio.coroutine
    def stop(self, request):
        if (self._d_task is not None) and (not self._d_task.cancelled()):
            print('Downloader stopping...')
            self._d_task.cancel()            
            # cancel() just say task it should be cancelled
            # to able task handle CancelledError await for it
            with suppress(asyncio.CancelledError):
                yield from self._d_task
            self._d_task = None
            return web.Response(body=b"Downloader stopped")
        else:
            return web.Response(body=b"Downloader already stopped or stopping")

    @asyncio.coroutine
    def _downloader(self):
        while True:
            print('Downloading and verifying file...')
            # Dummy sleep - to be replaced by actual code
            yield from asyncio.sleep(random.randint(1, 2))
            # Wait a predefined nr of seconds between downloads
            yield from asyncio.sleep(1)


if __name__ == '__main__':
    t = aiotest()
    t.run_server()
Другие вопросы по тегам