asyncio + asyncpg + pandas: получить pandas.df с асинхронным выбором из db - ОШИБКА
Отредактировал мой код - СЕЙЧАС он работает. Я пытаюсь получить некоторую дату из моей базы данных Postgres через пул соединений asyncpg асинхронно. В основном моя база данных содержит около 100 различных таблиц (на город), и я пытаюсь собрать все данные в один кадр как можно быстрее.
import pandas as pd
import asyncpg
import asyncio
from time import time
def make_t():
lst = []
# iterator for sql tuple
for i in ['a',
'b',
'c']:
i1 = i
sql = """
SELECT
'%s' as city,
MAX(starttime) AS max_ts
FROM
"table_%s"
"""
lst.append(sql % (i, i1))
return tuple(lst)
async def get_data(pool, sql):
start = time()
async with pool.acquire() as conn:
stmt = await conn.prepare(sql)
columns = [a.name for a in stmt.get_attributes()]
data = await stmt.fetch()
print(f'Exec time: {time() - start}')
return pd.DataFrame(data, columns=columns)
async def main():
dsn = 'postgres://user:pass@127.0.0.1:5432/my_base'
cT = ['city', 'max_ts']
sqls = make_t()
pool = await asyncpg.create_pool(dsn=dsn, max_size=50)
start = time()
tasks = []
for sql in sqls:
tasks.append(loop.create_task(get_data(pool, sql)))
tasks = await asyncio.gather(*tasks)
df = pd.DataFrame(columns=cT)
for task in tasks:
# form df from corutine results
df = df.append(task.result())
print(f'total exec time: {time() - start} secs')
print('exiting main')
return df
loop = asyncio.get_event_loop()
df = loop.run_until_complete(main())
loop.close()
print('exiting program')
Python 3.6.5:: Anaconda, Inc.
Получает мне эту ошибку:
Traceback (последний вызов был последним): Файл "", строка 319, в файле "/Users/fixx/anaconda3/lib/python3.6/asyncio/base_events.py", строка 468, в run_until_complete, возвращает файл future.result(). "", строка 308, в главном файле "/Users/fixx/anaconda3/lib/python3.6/asyncio/tasks.py", строка 594, в сборе для аргумента в наборе (coros_or_futures): TypeError: нечитаемый тип: 'list'
Я не могу понять, почему? Мои sqls в кортеже!
1 ответ
asyncio.gather
принимает сопрограммы в качестве отдельных аргументов, и вы отправляете ему список задач. Вы должны использовать *
оператор звонить gather
правильно:
tasks = await asyncio.gather(*tasks)