Ускорение панд.DataFrame.to_sql с быстрым исполнением pyODBC
Я хотел бы отправить большой pandas.DataFrame
на удаленный сервер под управлением MS SQL. То, как я делаю это сейчас, путем преобразования data_frame
возражать против списка кортежей, а затем отправить его с pyODBC executemany()
функция. Это выглядит примерно так:
import pyodbc as pdb
list_of_tuples = convert_df(data_frame)
connection = pdb.connect(cnxn_str)
cursor = connection.cursor()
cursor.fast_executemany = True
cursor.executemany(sql_statement, list_of_tuples)
connection.commit()
cursor.close()
connection.close()
Затем я начал задаваться вопросом, можно ли ускорить процесс (или, по крайней мере, сделать его более читабельным) с помощью data_frame.to_sql()
метод. Я придумал следующее решение:
import sqlalchemy as sa
engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
data_frame.to_sql(table_name, engine, index=False)
Теперь код стал более читабельным, но загрузка по крайней мере в 150 раз медленнее...
Есть ли способ перевернуть fast_executemany
при использовании SQLAlchemy?
Я использую pandas-0.20.3, pyODBC-4.0.21 и sqlalchemy-1.1.13.
5 ответов
После обращения к разработчикам SQLAlchemy появился способ решить эту проблему. Большое спасибо им за отличную работу!
Нужно использовать событие выполнения курсора и проверить, executemany
Флаг был поднят. Если это действительно так, переключите fast_executemany
опция включена. Например:
from sqlalchemy import event
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
Более подробную информацию о событиях выполнения можно найти здесь.
Просто сделал аккаунт чтобы опубликовать это. Я хотел прокомментировать нижеупомянутую ветку, так как это продолжение уже предоставленного ответа. Вышеупомянутое решение работало для меня с драйвером SQL версии 17 для записи в хранилище Microsft SQL из установки на основе Ubuntu.
Полный код, который я использовал для значительного ускорения (говорим> ускорение в 100 раз), приведен ниже. Это фрагмент кода "под ключ" при условии, что вы измените строку подключения, указав соответствующие данные. На постере выше, большое спасибо за решение, так как я уже довольно долго искал это.
import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus
conn = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
print("FUNC call")
if executemany:
cursor.fast_executemany = True
table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))
s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)
Основываясь на комментариях ниже, я хотел занять некоторое время, чтобы объяснить некоторые ограничения в отношении панд to_sql
реализация и способ обработки запроса. Есть 2 вещи, которые могут вызвать MemoryError
Будучи поднятым afaik:
1) Предполагается, что вы пишете в удаленное хранилище SQL. Когда вы пытаетесь написать большую панду DataFrame с to_sql
Этот метод преобразует весь фрейм данных в список значений. Это преобразование занимает намного больше оперативной памяти, чем исходный DataFrame (поверх него, так как старый DataFrame все еще присутствует в RAM). Этот список предоставляется до финала executemany
позвоните для вашего разъема ODBC. Я думаю, что у соединителя ODBC есть некоторые проблемы, обрабатывающие такие большие запросы. Чтобы решить эту проблему, нужно предоставить to_sql
Метод аргумента chunksize (10**5, по-видимому, близок к оптимальному, обеспечивая скорость записи около 600 Мбит / с (!) в приложении MSSQL для хранения данных с 2 ЦП 7 ГБ оперативной памяти от Azure - кстати, не может порекомендовать Azure). Таким образом, первое ограничение - размер запроса - можно обойти, если chunksize
аргумент. Однако это не позволит вам записать фрейм данных размером 10**7 или больше (по крайней мере, не на той виртуальной машине, с которой я работаю, у которой ~55 ГБ ОЗУ), являющейся номером 2.
Это можно обойти, разбив DataFrame с помощью np.split
(размер блока данных DataFrame 10**6). Они могут быть записаны итеративно. Я постараюсь сделать запрос на извлечение, когда у меня будет готовое решение для to_sql
метод в ядре самой панды, так что вам не придется делать это заранее каждый раз. В любом случае я написал функцию, похожую (не под ключ) на следующую:
import pandas as pd
import numpy as np
def write_df_to_sql(df, **kwargs):
chunks = np.split(df, df.shape()[0] / 10**6)
for chunk in chunks:
chunk.to_sql(**kwargs)
return True
Более полный пример приведенного выше фрагмента можно посмотреть здесь: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py
Это класс, который я написал, который включает в себя патч и облегчает некоторые необходимые накладные расходы, связанные с настройкой соединений с SQL. Еще нужно написать некоторую документацию. Также я планировал добавить патч для самой панды, но пока не нашел хорошего способа сделать это.
Надеюсь, это поможет.
Я столкнулся с той же проблемой, но с использованием PostgreSQL. Теперь они просто выпускают версию 0.24.0 для панд, и в to_sql
функция называется method
который решил мою проблему.
from sqlalchemy import create_engine
engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")
Скорость загрузки в 100 раз выше для меня. Я также рекомендую установить chunksize
Параметр, если вы собираетесь отправить много данных.
Я просто хотел опубликовать этот полный пример в качестве дополнительного высокопроизводительного варианта для тех, кто может использовать новую библиотеку turbodbc: http://turbodbc.readthedocs.io/en/latest/
Ясно, что существует множество параметров между потоками между пандами.to_sql(), запускающих fast_executemany через sqlalchemy, непосредственного использования pyodbc с кортежами /lists/etc. Или даже попыткой BULK UPLOAD с плоскими файлами.
Надеемся, что следующее может сделать жизнь немного приятнее, так как функциональность развивается в текущем проекте Pandas или включает что-то вроде интеграции с Turbodbc в будущем.
import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO
test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''
df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)
test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]
CREATE TABLE [db_name].[schema].[test]
(
id int NULL,
transaction_dt datetime NULL,
units int NULL,
measures float NULL
)
INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
VALUES (?,?,?,?) '''
cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]
turbodbc должен быть ОЧЕНЬ быстрым во многих случаях использования (особенно с массивами numpy). Пожалуйста, обратите внимание, насколько просто передать лежащие в основе массивы из столбцов данных в качестве параметров непосредственно в запрос. Я также считаю, что это помогает предотвратить создание промежуточных объектов, которые чрезмерно увеличивают потребление памяти. Надеюсь, что это полезно!
Как указано @Pylander
На сегодняшний день Turbodbc - лучший выбор для приема данных!
Я так обрадовался этому, что написал "блог" на своем github и medium: пожалуйста, проверьте https://medium.com/@erickfis/etl-process-with-python-and-turbodbc-95534f6f7ec
за рабочий пример и сравнение с pandas.to_sql
Короче,
с turbodbc у меня 10000 строк (77 столбцов) за 3 секунды
с pandas.to_sql я получил те же 10000 строк (77 столбцов) за 198 секунд...
Похоже, что Pandas 0.23.0 и 0.24.0 используют вставки с несколькими значениями в PyODBC, что не позволяет быстрому исполнению помочь - единственный INSERT ... VALUES ...
Выписка выдается за кусок. Вставки с несколькими значениями являются улучшением по сравнению со старым по умолчанию медленным выполнением, но, по крайней мере, в простых тестах метод быстрого выполнения все еще преобладает, не говоря уже о необходимости ручного chunksize
расчеты, как требуется для вставки нескольких значений. Принудительное старое поведение может быть выполнено с помощью monkeypatching, если в будущем не будет предоставлена опция конфигурации:
import pandas.io.sql
def insert_statement(self, data, conn):
return self.table.insert(), data
pandas.io.sql.SQLTable.insert_statement = insert_statement
Производительность INSERT для SQL Server: pyodbc против turbodbc
Когда используешь to_sql
Чтобы загрузить DataFrame от pandas в SQL Server, turbodbc определенно будет быстрее, чем pyodbc без fast_executemany
, Однако с fast_executemany
включенный для pyodbc, оба подхода дают практически одинаковую производительность.
Тестовые среды:
[Venv1_pyodbc]
pyodbc 2.0.25
[Venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0
[общий для обоих]
Python 3.6.4 64-битный на Windows
SQLAlchemy 1.3.0b1
Панды 0.23.4
numpy 1.15.4
Тестовый код:
# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')
# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
[[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
columns=[f'col{y:03}' for y in range(num_cols)]
)
t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
Тесты проводились двенадцать (12) раз для каждой среды, отбрасывая лучшие и худшие времена для каждой среды. Результаты (в секундах):
rank pyodbc turbodbc
---- ------ --------
1 22.8 27.5
2 23.4 28.1
3 24.6 28.2
4 25.2 28.5
5 25.7 29.3
6 26.9 29.9
7 27.0 31.4
8 30.1 32.1
9 33.6 32.5
10 39.8 32.9
---- ------ --------
average 27.9 30.0
Просто хотел добавить к ответу @JK.
Если вы используете этот подход:
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
И вы получаете эту ошибку:
"sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft][SQL Server Native Client 11.0]Function sequence error (0) (SQLParamData)') [SQL: 'INSERT INTO ... (...) VALUES (?, ?)'] [parameters: ((..., ...), (..., ...)] (Background on this error at: http://sqlalche.me/e/dbapi)"
Кодируйте ваши строковые значения, например, "yourStringValue".encode("ascii").
Это решит вашу проблему.
Я просто модифицирую линейку двигателей, что помогает мне ускорить установку в 100 раз.
Старый код -
import json
import maya
import time
import pandas
import pyodbc
import pandas as pd
from sqlalchemy import create_engine
retry_count = 0
retry_flag = True
hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
print("Read Ok")
engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")
while retry_flag and retry_count < 5:
try:
df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
retry_flag = False
except:
retry_count = retry_count + 1
time.sleep(30)
Модифицированная линейка двигателей -
От -
engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")
чтобы -
engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)
спросите меня о подключении Python к SQL, связанного с запросами, я буду рад вам помочь.