Python асинхронный HTTP-запрос

У меня есть оборудование с интерфейсом http, которое часто генерирует бесконечную страницу http со значениями, которые я хочу проанализировать и сохранить в базе данных. Я начал с запросов:

import asyncio
import asyncpg
import requests

class node_http_mtr():
    def __init__(self, ip, nsrc, ndst):
        self.ip = ip
        self.nsrc = nsrc
        self.ndst = ndst
        try:
            self.data = requests.get('http://' + self.ip + '/nph-cgi_mtr?duration=-1&interval=0', stream=True, timeout=10)
        except:
            return

    def __iter__(self):
        return self

    def __next__(self):
        mtr = list()
        try:
            for chunk in self.data.iter_content(32 * (self.nsrc + self.ndst), '\n'):
                # DEBUG log chunk
                for line in chunk.split('\n'):
                    # DEBUG log line
                    if line.startswith('MTR'):
                        try:
                            _, io, num, val = line.split(' ')
                            l, r = val.split(':')[1], val.split(':')[2]
                            mtr.append((self.ip, io+num, l, r))
                        except:
                            # ERROR log line
                            pass
                        if len(mtr) == self.nsrc + self.ndst:
                            break
                if len(mtr) == self.nsrc + self.ndst:
                    yield mtr
                else:
                    continue
        except:
            # ERROR connection lost
            return


async def save_to_db(data_to_save):
    global pool
    try:
        async with pool.acquire() as conn:
            await conn.execute('''INSERT INTO mtr (ip, io, l, r) VALUES %s''' % ','.join(str(row) for row in data_to_save))
    finally:
        await pool.release(conn)


async def remove_from_db(ip):
    global pool
    try:
        async with pool.acquire() as conn:
            await conn.execute('''DELETE FROM httpmtr WHERE ip = $1''', ip)
    finally:
        await pool.release(conn)


async def http_mtr_worker():
    global workers_list
    global loop
    while True:
        await asyncio.sleep(0)
        for ip in list(workers_list):
            data_to_save = next(workers_list[ip])
            if data_to_save:
                asyncio.ensure_future(save_to_db(next(data_to_save)))
            await asyncio.sleep(0)


async def check_for_workers():
    global workers_list
    global pool
    while True:
        await asyncio.sleep(0)
        try:
            async with pool.acquire() as conn:
                workers = await conn.fetch('''SELECT ip FROM httpmtr''')
        finally:
            await pool.release(conn)
        for worker in workers:
            if worker['ip'] not in list(workers_list):
                workers_list[worker['ip']] = node_http_mtr(worker['ip'], 8, 8)
                await asyncio.sleep(0)
                print('Add worker', worker['ip'])
            await asyncio.sleep(0)
        ips_to_delete = set(workers_list.keys()) - set([i[0] for i in workers])
        if len(ips_to_delete) != 0:
            for ip in ips_to_delete:
                print('Delete worker ', ip)
                workers_list.pop(ip)
                await asyncio.sleep(0)


async def make_db_connection():
    pool = await asyncpg.create_pool(user='postgres', password='data', database='test', host='localhost', max_queries=50000, command_timeout=60)
    return pool


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pool = loop.run_until_complete(make_db_connection())
workers_list = {}
try:
    asyncio.ensure_future(check_for_workers())
    asyncio.ensure_future(http_mtr_worker())
    loop.run_forever()
except Exception as e:
    print(e)
    pass
finally:
    print("Closing Loop")
    loop.close()

Я запустил процедуру в БД, которая удаляет все данные старше 1 секунды, конечный результат с одним работником в PostgreSQL:

test=# select count(*) from mtr;
 count
-------
   384
(1 ёЄЁюър)

Это означает 384 результата в секунду. В каждом устройстве есть 16 различных типов данных, поэтому у меня 384/16 = 24 значения в секунду. Это соответствующий результат. Но чем больше работников я добавляю, тем хуже показатели у меня: с 10 работниками у меня в 2-3 раза меньше значений. Цель состоит в том, чтобы иметь сотни рабочих и 24-25 значений / сек. Затем я попытался использовать aiohttp. Я ожидал получить гораздо лучший результат. Наскоро я написал тестовый код:

import asyncio
from aiohttp import ClientSession
import asyncpg

async def parse(line):
    if line.startswith('MTR'):
        _, io, num, val = line.split(' ')
        l, r = val.split(':')[1], val.split(':')[2]
    return ('ip.will.be.here', io + num, l, r)

async def run():
    url = "http://10.150.20.130/nph-cgi_mtr?duration=-1&interval=0"
    async with ClientSession() as session:
        while True:
            async with session.get(url) as response:
                buffer = b''
                start = False
                async for line in response.content.iter_any():
                    if line.startswith(b'\n'):
                        start = True
                        buffer += line
                    elif start and line.endswith(b'\n'):
                        buffer += line
                        mtr = [await parse(line) for line in buffer.decode().split('\n')[1:-1]]
                        await save_to_db(mtr)
                        break
                    elif start:
                        buffer += line


async def make_db_connection():
    pool = await asyncpg.create_pool(user='postgres', password='data', database='test', host='localhost', max_queries=50000, command_timeout=60)
    return pool


async def save_to_db(data_to_save):
    global pool
    try:
        async with pool.acquire() as conn:
            await conn.execute('''INSERT INTO mtr (ip, io, l, r) VALUES %s''' % ','.join(str(row) for row in data_to_save))
    finally:
        await pool.release(conn)


loop = asyncio.get_event_loop()
pool = loop.run_until_complete(make_db_connection())
future = asyncio.ensure_future(run())
loop.run_until_complete(future)

И у меня есть это:

test=# select count(*) from mtr;
 count
-------
    80
(1 ёЄЁюър)

Т.е. у меня в 5 раз хуже производительность с асинхронными запросами. Я застрял. Я не понимаю, как это решить.

ОБНОВИТЬ. Профилирование не сделало ситуацию более ясной.

Запросы: профиль запросов aiohttp: профиль aiohttp

С запросами ситуация более или менее понятна. Но что за проблема с async aiohttp я вообще не понимаю.

ОБНОВЛЕНИЕ 16/05/18. Наконец я вернулся к многопоточности и получил то, что мне нужно - постоянную производительность с большим количеством работников. Асинхронные вызовы на самом деле не панацея.

0 ответов

Другие вопросы по тегам