Попытка реализовать 2 "потока" с помощью модуля `asyncio`
Я поиграл с потоками раньше в Python, но решил дать asyncio
Попробуйте модуль, тем более что вы можете отменить запущенное задание, которое показалось приятной деталью. Однако по какой-то причине я не могу обернуть голову вокруг этого.
Вот что я хотел реализовать (извините, если я использую неверную терминологию):
downloader
Поток, который загружает один и тот же файл каждые x секунд, проверяет его хеш относительно предыдущей загрузки и сохраняет его, если он отличается.webserver
поток, который работает в фоновом режиме, позволяя контролировать (пауза, список, остановка)downloader
нить.
я использовал aiohttp
для веб-сервера.
Это то, что я до сих пор:
class aiotest():
def __init__(self):
self._dl = None # downloader future
self._webapp = None # web server future
self.init_server()
def init_server(self):
print('Setting up web interface')
app = web.Application()
app.router.add_route('GET', '/stop', self.stop)
print('added urls')
self._webapp = app
@asyncio.coroutine
def _downloader(self):
while True:
try:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(3,10))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(30)
except asyncio.CancelledError:
break
@asyncio.coroutine
def _supervisor(self):
print('Starting downloader')
self._dl = asyncio.async(self._downloader())
def start(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._supervisor())
loop.close()
@asyncio.coroutine
def stop(self):
print('Received STOP')
self._dl.cancel()
return web.Response(body=b"Stopping... ")
Этот класс вызывается:
t = aiotest()
t.start()
Это не работает, конечно, и я чувствую, что это ужасный кусок кода.
Что мне неясно:
- Я останавливаю
downloader
вstop()
метод, но как бы я остановил веб-сервер (например, вshutdown()
метод)? - Ли
downloader
нужен новый цикл событий, или я могу использовать цикл, возвращенныйasyncio.get_event_loop()
? - Мне действительно нужно что-то вроде
supervisor
для чего я пытаюсь реализовать? Это кажется таким неуклюжим. И как я могу получитьsupervisor
продолжать работать вместо завершения после одного выполнения, как сейчас?
Последний, более общий вопрос: asyncio
должен заменить threading
модуль (в будущем)? Или у каждого свое приложение?
Я ценю все указатели, замечания и разъяснения!
1 ответ
Почему текущий код не работает:
Вы запускаете цикл обработки событий до
self._supervisor()
завершено.self._supervisor()
создает задачу (это происходит сразу) и сразу же завершается.Вы пытаетесь запустить цикл обработки событий, пока
_supervisor
завершено, но как и когда вы собираетесь запустить сервер? Я думаю, что цикл событий должен работать, пока сервер не остановился._supervisor
или другие вещи могут быть добавлены в качестве задачи (в том же цикле событий).aiohttp
уже есть функция для запуска сервера и цикла событий -web.run_app
, но мы можем сделать это вручную.
Ваши вопросы:
Ваш сервер будет работать, пока вы не остановите его. Вы можете запускать / останавливать различные сопрограммы во время работы вашего сервера.
Вам нужен только один цикл событий для разных сопрограмм.
Я думаю тебе не нужно
supervisor
,Более общий вопрос:
asyncio
помогает вам запускать различные функции параллельно в одном потоке в одном процессе. Вот почему Asyncio такой крутой и быстрый. Часть вашего кода синхронизации с потоками вы можете переписать с помощью asyncio и его сопрограмм. Более того: asyncio может взаимодействовать с потоками и процессами. Это может быть полезно в том случае, если вам все еще нужны потоки и процессы: вот пример.
Полезные заметки:
- Лучше использовать термин
coroutine
вместоthread
в то время как мы говорим о сопрограмм Asyncio, которые не являются потоками - Если вы используете Python 3.5, вы можете использовать
async
/await
синтаксис вместоcoroutine
/yield from
Я переписал ваш код, чтобы показать вам идею. Как это проверить: запустить программу, увидеть консоль, открыть http://localhost:8080/stop
смотрите консоль, открывайте http://localhost:8080/start
см. консоль, наберите CTRL+C.
import asyncio
import random
from contextlib import suppress
from aiohttp import web
class aiotest():
def __init__(self):
self._webapp = None
self._d_task = None
self.init_server()
# SERVER:
def init_server(self):
app = web.Application()
app.router.add_route('GET', '/start', self.start)
app.router.add_route('GET', '/stop', self.stop)
app.router.add_route('GET', '/kill_server', self.kill_server)
self._webapp = app
def run_server(self):
# Create server:
loop = asyncio.get_event_loop()
handler = self._webapp.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
try:
# Start downloader at server start:
asyncio.async(self.start(None)) # I'm using controllers here and below to be short,
# but it's better to split controller and start func
# Start server:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
# Stop downloader when server stopped:
loop.run_until_complete(self.stop(None))
# Cleanup resources:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(self._webapp.shutdown())
loop.run_until_complete(handler.finish_connections(60.0))
loop.run_until_complete(self._webapp.cleanup())
loop.close()
@asyncio.coroutine
def kill_server(self, request):
print('Server killing...')
loop = asyncio.get_event_loop()
loop.stop()
return web.Response(body=b"Server killed")
# DOWNLOADER
@asyncio.coroutine
def start(self, request):
if self._d_task is None:
print('Downloader starting...')
self._d_task = asyncio.async(self._downloader())
return web.Response(body=b"Downloader started")
else:
return web.Response(body=b"Downloader already started")
@asyncio.coroutine
def stop(self, request):
if (self._d_task is not None) and (not self._d_task.cancelled()):
print('Downloader stopping...')
self._d_task.cancel()
# cancel() just say task it should be cancelled
# to able task handle CancelledError await for it
with suppress(asyncio.CancelledError):
yield from self._d_task
self._d_task = None
return web.Response(body=b"Downloader stopped")
else:
return web.Response(body=b"Downloader already stopped or stopping")
@asyncio.coroutine
def _downloader(self):
while True:
print('Downloading and verifying file...')
# Dummy sleep - to be replaced by actual code
yield from asyncio.sleep(random.randint(1, 2))
# Wait a predefined nr of seconds between downloads
yield from asyncio.sleep(1)
if __name__ == '__main__':
t = aiotest()
t.run_server()