Python асинхронный HTTP-запрос
У меня есть оборудование с интерфейсом http, которое часто генерирует бесконечную страницу http со значениями, которые я хочу проанализировать и сохранить в базе данных. Я начал с запросов:
import asyncio
import asyncpg
import requests
class node_http_mtr():
def __init__(self, ip, nsrc, ndst):
self.ip = ip
self.nsrc = nsrc
self.ndst = ndst
try:
self.data = requests.get('http://' + self.ip + '/nph-cgi_mtr?duration=-1&interval=0', stream=True, timeout=10)
except:
return
def __iter__(self):
return self
def __next__(self):
mtr = list()
try:
for chunk in self.data.iter_content(32 * (self.nsrc + self.ndst), '\n'):
# DEBUG log chunk
for line in chunk.split('\n'):
# DEBUG log line
if line.startswith('MTR'):
try:
_, io, num, val = line.split(' ')
l, r = val.split(':')[1], val.split(':')[2]
mtr.append((self.ip, io+num, l, r))
except:
# ERROR log line
pass
if len(mtr) == self.nsrc + self.ndst:
break
if len(mtr) == self.nsrc + self.ndst:
yield mtr
else:
continue
except:
# ERROR connection lost
return
async def save_to_db(data_to_save):
global pool
try:
async with pool.acquire() as conn:
await conn.execute('''INSERT INTO mtr (ip, io, l, r) VALUES %s''' % ','.join(str(row) for row in data_to_save))
finally:
await pool.release(conn)
async def remove_from_db(ip):
global pool
try:
async with pool.acquire() as conn:
await conn.execute('''DELETE FROM httpmtr WHERE ip = $1''', ip)
finally:
await pool.release(conn)
async def http_mtr_worker():
global workers_list
global loop
while True:
await asyncio.sleep(0)
for ip in list(workers_list):
data_to_save = next(workers_list[ip])
if data_to_save:
asyncio.ensure_future(save_to_db(next(data_to_save)))
await asyncio.sleep(0)
async def check_for_workers():
global workers_list
global pool
while True:
await asyncio.sleep(0)
try:
async with pool.acquire() as conn:
workers = await conn.fetch('''SELECT ip FROM httpmtr''')
finally:
await pool.release(conn)
for worker in workers:
if worker['ip'] not in list(workers_list):
workers_list[worker['ip']] = node_http_mtr(worker['ip'], 8, 8)
await asyncio.sleep(0)
print('Add worker', worker['ip'])
await asyncio.sleep(0)
ips_to_delete = set(workers_list.keys()) - set([i[0] for i in workers])
if len(ips_to_delete) != 0:
for ip in ips_to_delete:
print('Delete worker ', ip)
workers_list.pop(ip)
await asyncio.sleep(0)
async def make_db_connection():
pool = await asyncpg.create_pool(user='postgres', password='data', database='test', host='localhost', max_queries=50000, command_timeout=60)
return pool
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pool = loop.run_until_complete(make_db_connection())
workers_list = {}
try:
asyncio.ensure_future(check_for_workers())
asyncio.ensure_future(http_mtr_worker())
loop.run_forever()
except Exception as e:
print(e)
pass
finally:
print("Closing Loop")
loop.close()
Я запустил процедуру в БД, которая удаляет все данные старше 1 секунды, конечный результат с одним работником в PostgreSQL:
test=# select count(*) from mtr;
count
-------
384
(1 ёЄЁюър)
Это означает 384 результата в секунду. В каждом устройстве есть 16 различных типов данных, поэтому у меня 384/16 = 24 значения в секунду. Это соответствующий результат. Но чем больше работников я добавляю, тем хуже показатели у меня: с 10 работниками у меня в 2-3 раза меньше значений. Цель состоит в том, чтобы иметь сотни рабочих и 24-25 значений / сек. Затем я попытался использовать aiohttp. Я ожидал получить гораздо лучший результат. Наскоро я написал тестовый код:
import asyncio
from aiohttp import ClientSession
import asyncpg
async def parse(line):
if line.startswith('MTR'):
_, io, num, val = line.split(' ')
l, r = val.split(':')[1], val.split(':')[2]
return ('ip.will.be.here', io + num, l, r)
async def run():
url = "http://10.150.20.130/nph-cgi_mtr?duration=-1&interval=0"
async with ClientSession() as session:
while True:
async with session.get(url) as response:
buffer = b''
start = False
async for line in response.content.iter_any():
if line.startswith(b'\n'):
start = True
buffer += line
elif start and line.endswith(b'\n'):
buffer += line
mtr = [await parse(line) for line in buffer.decode().split('\n')[1:-1]]
await save_to_db(mtr)
break
elif start:
buffer += line
async def make_db_connection():
pool = await asyncpg.create_pool(user='postgres', password='data', database='test', host='localhost', max_queries=50000, command_timeout=60)
return pool
async def save_to_db(data_to_save):
global pool
try:
async with pool.acquire() as conn:
await conn.execute('''INSERT INTO mtr (ip, io, l, r) VALUES %s''' % ','.join(str(row) for row in data_to_save))
finally:
await pool.release(conn)
loop = asyncio.get_event_loop()
pool = loop.run_until_complete(make_db_connection())
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
И у меня есть это:
test=# select count(*) from mtr;
count
-------
80
(1 ёЄЁюър)
Т.е. у меня в 5 раз хуже производительность с асинхронными запросами. Я застрял. Я не понимаю, как это решить.
ОБНОВИТЬ. Профилирование не сделало ситуацию более ясной.
С запросами ситуация более или менее понятна. Но что за проблема с async aiohttp я вообще не понимаю.
ОБНОВЛЕНИЕ 16/05/18. Наконец я вернулся к многопоточности и получил то, что мне нужно - постоянную производительность с большим количеством работников. Асинхронные вызовы на самом деле не панацея.