Ускорение панд.DataFrame.to_sql с быстрым исполнением pyODBC

Я хотел бы отправить большой pandas.DataFrame на удаленный сервер под управлением MS SQL. То, как я делаю это сейчас, путем преобразования data_frame возражать против списка кортежей, а затем отправить его с pyODBC executemany() функция. Это выглядит примерно так:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

Затем я начал задаваться вопросом, можно ли ускорить процесс (или, по крайней мере, сделать его более читабельным) с помощью data_frame.to_sql() метод. Я придумал следующее решение:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

Теперь код стал более читабельным, но загрузка по крайней мере в 150 раз медленнее...

Есть ли способ перевернуть fast_executemany при использовании SQLAlchemy?

Я использую pandas-0.20.3, pyODBC-4.0.21 и sqlalchemy-1.1.13.

5 ответов

Решение

После обращения к разработчикам SQLAlchemy появился способ решить эту проблему. Большое спасибо им за отличную работу!

Нужно использовать событие выполнения курсора и проверить, executemany Флаг был поднят. Если это действительно так, переключите fast_executemany опция включена. Например:

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

Более подробную информацию о событиях выполнения можно найти здесь.

Просто сделал аккаунт чтобы опубликовать это. Я хотел прокомментировать нижеупомянутую ветку, так как это продолжение уже предоставленного ответа. Вышеупомянутое решение работало для меня с драйвером SQL версии 17 для записи в хранилище Microsft SQL из установки на основе Ubuntu.

Полный код, который я использовал для значительного ускорения (говорим> ускорение в 100 раз), приведен ниже. Это фрагмент кода "под ключ" при условии, что вы измените строку подключения, указав соответствующие данные. На постере выше, большое спасибо за решение, так как я уже довольно долго искал это.

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True


table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))


s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

Основываясь на комментариях ниже, я хотел занять некоторое время, чтобы объяснить некоторые ограничения в отношении панд to_sql реализация и способ обработки запроса. Есть 2 вещи, которые могут вызвать MemoryError Будучи поднятым afaik:

1) Предполагается, что вы пишете в удаленное хранилище SQL. Когда вы пытаетесь написать большую панду DataFrame с to_sql Этот метод преобразует весь фрейм данных в список значений. Это преобразование занимает намного больше оперативной памяти, чем исходный DataFrame (поверх него, так как старый DataFrame все еще присутствует в RAM). Этот список предоставляется до финала executemany позвоните для вашего разъема ODBC. Я думаю, что у соединителя ODBC есть некоторые проблемы, обрабатывающие такие большие запросы. Чтобы решить эту проблему, нужно предоставить to_sql Метод аргумента chunksize (10**5, по-видимому, близок к оптимальному, обеспечивая скорость записи около 600 Мбит / с (!) в приложении MSSQL для хранения данных с 2 ЦП 7 ГБ оперативной памяти от Azure - кстати, не может порекомендовать Azure). Таким образом, первое ограничение - размер запроса - можно обойти, если chunksize аргумент. Однако это не позволит вам записать фрейм данных размером 10**7 или больше (по крайней мере, не на той виртуальной машине, с которой я работаю, у которой ~55 ГБ ОЗУ), являющейся номером 2.

Это можно обойти, разбив DataFrame с помощью np.split (размер блока данных DataFrame 10**6). Они могут быть записаны итеративно. Я постараюсь сделать запрос на извлечение, когда у меня будет готовое решение для to_sql метод в ядре самой панды, так что вам не придется делать это заранее каждый раз. В любом случае я написал функцию, похожую (не под ключ) на следующую:

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True

Более полный пример приведенного выше фрагмента можно посмотреть здесь: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

Это класс, который я написал, который включает в себя патч и облегчает некоторые необходимые накладные расходы, связанные с настройкой соединений с SQL. Еще нужно написать некоторую документацию. Также я планировал добавить патч для самой панды, но пока не нашел хорошего способа сделать это.

Надеюсь, это поможет.

Я столкнулся с той же проблемой, но с использованием PostgreSQL. Теперь они просто выпускают версию 0.24.0 для панд, и в to_sql функция называется method который решил мою проблему.

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

Скорость загрузки в 100 раз выше для меня. Я также рекомендую установить chunksize Параметр, если вы собираетесь отправить много данных.

Я просто хотел опубликовать этот полный пример в качестве дополнительного высокопроизводительного варианта для тех, кто может использовать новую библиотеку turbodbc: http://turbodbc.readthedocs.io/en/latest/

Ясно, что существует множество параметров между потоками между пандами.to_sql(), запускающих fast_executemany через sqlalchemy, непосредственного использования pyodbc с кортежами /lists/etc. Или даже попыткой BULK UPLOAD с плоскими файлами.

Надеемся, что следующее может сделать жизнь немного приятнее, так как функциональность развивается в текущем проекте Pandas или включает что-то вроде интеграции с Turbodbc в будущем.

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

turbodbc должен быть ОЧЕНЬ быстрым во многих случаях использования (особенно с массивами numpy). Пожалуйста, обратите внимание, насколько просто передать лежащие в основе массивы из столбцов данных в качестве параметров непосредственно в запрос. Я также считаю, что это помогает предотвратить создание промежуточных объектов, которые чрезмерно увеличивают потребление памяти. Надеюсь, что это полезно!

Как указано @Pylander

На сегодняшний день Turbodbc - лучший выбор для приема данных!

Я так обрадовался этому, что написал "блог" на своем github и medium: пожалуйста, проверьте https://medium.com/@erickfis/etl-process-with-python-and-turbodbc-95534f6f7ec

за рабочий пример и сравнение с pandas.to_sql

Короче,

с turbodbc у меня 10000 строк (77 столбцов) за 3 секунды

с pandas.to_sql я получил те же 10000 строк (77 столбцов) за 198 секунд...

Похоже, что Pandas 0.23.0 и 0.24.0 используют вставки с несколькими значениями в PyODBC, что не позволяет быстрому исполнению помочь - единственный INSERT ... VALUES ... Выписка выдается за кусок. Вставки с несколькими значениями являются улучшением по сравнению со старым по умолчанию медленным выполнением, но, по крайней мере, в простых тестах метод быстрого выполнения все еще преобладает, не говоря уже о необходимости ручного chunksize расчеты, как требуется для вставки нескольких значений. Принудительное старое поведение может быть выполнено с помощью monkeypatching, если в будущем не будет предоставлена ​​опция конфигурации:

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement

Производительность INSERT для SQL Server: pyodbc против turbodbc

Когда используешь to_sql Чтобы загрузить DataFrame от pandas в SQL Server, turbodbc определенно будет быстрее, чем pyodbc без fast_executemany, Однако с fast_executemany включенный для pyodbc, оба подхода дают практически одинаковую производительность.

Тестовые среды:

[Venv1_pyodbc]
pyodbc 2.0.25

[Venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[общий для обоих]
Python 3.6.4 64-битный на Windows
SQLAlchemy 1.3.0b1
Панды 0.23.4
numpy 1.15.4

Тестовый код:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

Тесты проводились двенадцать (12) раз для каждой среды, отбрасывая лучшие и худшие времена для каждой среды. Результаты (в секундах):

   rank  pyodbc  turbodbc
   ----  ------  --------
      1    22.8      27.5
      2    23.4      28.1
      3    24.6      28.2
      4    25.2      28.5
      5    25.7      29.3
      6    26.9      29.9
      7    27.0      31.4
      8    30.1      32.1
      9    33.6      32.5
     10    39.8      32.9
   ----  ------  --------
average    27.9      30.0

Просто хотел добавить к ответу @JK.

Если вы используете этот подход:

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

И вы получаете эту ошибку:

"sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft][SQL Server Native Client 11.0]Function sequence error (0) (SQLParamData)') [SQL: 'INSERT INTO ... (...) VALUES (?, ?)'] [parameters: ((..., ...), (..., ...)] (Background on this error at: http://sqlalche.me/e/dbapi)"

Кодируйте ваши строковые значения, например, "yourStringValue".encode("ascii").

Это решит вашу проблему.

Я просто модифицирую линейку двигателей, что помогает мне ускорить установку в 100 раз.

Старый код -

import json
import maya
import time
import pandas
import pyodbc
import pandas as pd
from sqlalchemy import create_engine

retry_count = 0
retry_flag = True

hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
print("Read Ok")

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

while retry_flag and retry_count < 5:
  try:
    df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
    retry_flag = False
  except:
    retry_count = retry_count + 1
    time.sleep(30)

Модифицированная линейка двигателей -

От -

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

чтобы -

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)

спросите меня о подключении Python к SQL, связанного с запросами, я буду рад вам помочь.

Другие вопросы по тегам