Python 3.5 для асинхронных блоков ioloop

У меня простой aiohttp-сервер с двумя обработчиками. Первый делает некоторые вычисления в async for петля. Второй просто возвращает текстовый ответ. not_so_long_operation возвращает 30-е число Фибоначчи с самой медленной рекурсивной реализацией, которая занимает около одной секунды.

def not_so_long_operation():
    return fib(30)

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            return i
        else:
            raise StopAsyncIteration

# GET /
async def index(request):
    print('request!')
    l = []
    async for i in arange(20):
        print(i)
        l.append(not_so_long_operation())

    return aiohttp.web.Response(text='%d\n' % l[0])

# GET /lol/
async def lol(request):
    print('request!')
    return aiohttp.web.Response(text='just respond\n')

Когда я пытаюсь получить / а потом /lol/, он дает мне ответ для второго только тогда, когда первый закончен.
Что я делаю неправильно и как заставить обработчик индекса освобождать ioloop на каждой итерации?

3 ответа

Решение

Ваш пример не имеет точек доходности (await заявления) для переключения между задачами. Асинхронный итератор позволяет использовать await внутри __aiter__/__anext__ но не вставляйте его автоматически в свой код.

Сказать,

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration

должен работать как вы ожидали.

В реальном приложении, скорее всего, вам не нужно await asyncio.sleep(0) звонки, потому что вы будете ждать доступа к базе данных и подобных действий.

Поскольку, fib(30) ограничен процессором и обменивается небольшими данными, вы, вероятно, должны использовать ProcessPoolExecutor (в отличие от ThreadPoolExecutor):

async def index(request):
    loop = request.app.loop
    executor = request.app["executor"]
    result = await loop.run_in_executor(executor, fib, 30)
    return web.Response(text="%d" % result)

Настроить executor когда вы создаете app:

app = Application(...)
app["exector"] = ProcessPoolExector()

Асинхронный итератор здесь на самом деле не нужен. Вместо этого вы можете просто вернуть управление циклу событий внутри вашего цикла. В Python 3.4 это делается с помощью простого yield:

@asyncio.coroutine
def index(self):
    for i in range(20):
        not_so_long_operation()
        yield

В Python 3.5 вы можете определить Empty объект, который в основном делает то же самое:

class Empty:
    def __await__(self):
        yield

Затем используйте его с await синтаксис:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await Empty()

Или просто используйте asyncio.sleep(0), который был недавно оптимизирован:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await asyncio.sleep(0)

Вы также можете запустить not_so_long_operation в потоке с использованием исполнителя по умолчанию:

async def index(request, loop):
    for i in range(20):
        await loop.run_in_executor(None, not_so_long_operation)
Другие вопросы по тегам