python pandas to_sql с sqlalchemy: как ускорить экспорт в MS SQL?
У меня есть датафрейм с примерно 155 000 строк и 12 столбцов. Если я экспортирую его в csv с dataframe.to_csv, на выходе будет файл размером 11 МБ (который создается мгновенно).
Однако если я экспортирую в Microsoft SQL Server с помощью метода to_sql, это займет от 5 до 6 минут! Никакие столбцы не являются текстовыми: только int, float, bool и date. Я видел случаи, когда драйверы ODBC устанавливали nvarchar(max), и это замедляло передачу данных, но здесь это не может быть так.
Любые предложения о том, как ускорить процесс экспорта? Из-за 6 минут экспорта 11 МБ данных соединение ODBC практически невозможно.
Спасибо!
Мой код:
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"
engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database)
conn = engine.connect()
metadata = MetaData(conn)
my_data_frame.to_sql(TableName,engine)
12 ответов
У меня недавно была такая же проблема, и я хочу добавить ответ на нее для других.to_sql
похоже, отправляет INSERT
запрос для каждой строки, что делает его очень медленным. Но с тех пор0.24.0
E сть method
параметр в pandas.to_sql()
где вы можете определить свою собственную функцию вставки или просто использовать method='multi'
чтобы указать пандам передать несколько строк в одном запросе INSERT, что делает его намного быстрее.
Обратите внимание, что ваша база данных может иметь ограничение на параметры. В этом случае вам также необходимо определить размер фрагмента.
Итак, решение должно выглядеть примерно так:
my_data_frame.to_sql(TableName, engine, chunksize=<yourParameterLimit>, method='multi')
Если вы не знаете предел параметра базы данных, просто попробуйте его без параметра chunksize. Он запустится или выдаст ошибку, сообщающую вам ваш лимит.
DataFrame.to_sql
Метод генерирует операторы вставки для вашего соединителя ODBC, который затем обрабатывается соединителем ODBC как обычные вставки.
Когда это медленно, это не вина панд.
Сохранение вывода DataFrame.to_sql
метод в файл, затем воспроизведение этого файла через соединитель ODBC займет столько же времени.
Правильный способ массового импорта данных в базу данных состоит в том, чтобы сгенерировать файл csv, а затем использовать команду загрузки, которая в разновидности MS баз данных SQL называется BULK INSERT
Например:
BULK INSERT mydatabase.myschema.mytable
FROM 'mydatadump.csv';
Ссылка на синтаксис выглядит следующим образом:
BULK INSERT
[ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]
FROM 'data_file'
[ WITH
(
[ [ , ] BATCHSIZE = batch_size ]
[ [ , ] CHECK_CONSTRAINTS ]
[ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]
[ [ , ] DATAFILETYPE =
{ 'char' | 'native'| 'widechar' | 'widenative' } ]
[ [ , ] FIELDTERMINATOR = 'field_terminator' ]
[ [ , ] FIRSTROW = first_row ]
[ [ , ] FIRE_TRIGGERS ]
[ [ , ] FORMATFILE = 'format_file_path' ]
[ [ , ] KEEPIDENTITY ]
[ [ , ] KEEPNULLS ]
[ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]
[ [ , ] LASTROW = last_row ]
[ [ , ] MAXERRORS = max_errors ]
[ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]
[ [ , ] ROWS_PER_BATCH = rows_per_batch ]
[ [ , ] ROWTERMINATOR = 'row_terminator' ]
[ [ , ] TABLOCK ]
[ [ , ] ERRORFILE = 'file_name' ]
)]
Вы можете использовать это: что делает его быстрее, так это method
параметр панд to_sql
. Надеюсь, эта помощь поможет.
По моему опыту, результат этого был от бесконечного времени до 8 секунд.
df = pd.read_csv('test.csv')
conn = create_engine(<connection_string>)
start_time = time.time()
df.to_sql('table_name', conn, method='multi',index=False, if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))
С участием
SQLAlchemy>=1.3
, при создании
engine
объект, набор
fast_executemany=True
. Ссылка
Почему pandas.DataFrame.to_sql
медленный?
При загрузке данных из pandas
на Microsoft SQL Server большая часть времени уходит на преобразование из pandas
в объекты Python в представление, необходимое для драйвера ODBC MS SQL. Одна из причинpandas
намного быстрее для аналитики, чем базовый код Python, потому что он работает с небольшими собственными массивами целых чисел / чисел с плавающей запятой /…, которые не имеют таких же накладных расходов, как их соответствующие аналоги на Python. Вto_sql
фактически преобразует все эти бережливые столбцы во многие отдельные объекты Python и, следовательно, не получает обычной обработки производительности, как другие pandas
операции есть.
Использовать turbodbc.Cursor.insertmanycolumns
чтобы ускорить это
Учитывая pandas.DataFrame
, вы можете использовать turbodbc
а также pyarrow
для вставки данных с меньшими накладными расходами на преобразование, чем при преобразовании в объекты Python.
import pyarrow as pa
import turbodbc
cursor = … # cursor to a MS SQL connection initiated with turbodbc
df = … # the pd.DataFrame to be inserted
# Convert the pandas.DataFrame to a pyarrow.Table, most of the columns
# will be zero-copy and thus this is quite fast.
table = pa.Table.from_pandas(table)
# Insert into the database
cursor.executemanycolumns("INSERT INTO my_table VALUES (?, ?, ?)",
table)
Почему это быстрее?
Вместо преобразования pd.DataFrame
-> коллекция объектов Python -> структуры данных ODBC, делаем путь преобразования pd.DataFrame
-> pyarrow.Table
-> Структура ODBC. Это более эффективно благодаря:
- Большинство столбцов
pandas.DataFrame
можно преобразовать в столбцыpyarrow.Table
без копирования. Столбцы таблицы будут ссылаться на одну и ту же память. Таким образом, фактического преобразования не происходит. - Преобразование выполняется полностью в машинном коде с собственными типами. Это означает, что ни на каком этапе у нас не возникают накладные расходы на объекты Python, пока у нас нет
object
набранные столбцы.
Дляsqlalchemy
>= 1.3, а не использоватьto_sql()
параметр метода , используйтеfast_executemany=True
вsqlalchemy's create_engine()
. Это должно быть как минимум так же быстро, какmethod="multi"
избегая ограничения T-SQL в 2100 значений параметров для хранимой процедуры, что вызывает ошибку, показанную здесь.
Кредит Горду Томпсону по той же ссылке.
Хотя не для MS SQL (на данный момент), но поскольку это одна из первых ссылок, которая обнаруживается для этой проблемы, для Postgres и MYSQL вы можете использовать d6tstack, который имеет быстрые функциональные возможности pandas to SQL, потому что он использует собственные команды импорта БД.
uri_psql = 'postgresql+psycopg2://usr:pwd@localhost/db'
d6tstack.utils.pd_to_psql(df, uri_psql, 'table')
Также полезно для импорта нескольких CSV с изменениями схемы данных и / или предварительной обработки пандами перед записью в базу данных, см. Далее в примерах блокнот
d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'),
apply_after_read=apply_fun).to_psql_combine(uri_psql, 'table')
У меня не хватало времени и памяти (более 18 ГБ выделено для DataFrame, загруженного из 120 МБ CSV) с этой строкой:
df.to_sql('my_table', engine, if_exists='replace', method='multi', dtype={"text_field": db.String(64), "text_field2": db.String(128), "intfield1": db.Integer(), "intfield2": db.Integer(), "floatfield": db.Float()})
Вот код, который помог мне импортировать и отслеживать ход вставок одновременно:
import sqlalchemy as db
engine = db.create_engine('mysql://user:password@localhost:3306/database_name', echo=False)
connection = engine.connect()
metadata = db.MetaData()
my_table = db.Table('my_table', metadata,
db.Column('text_field', db.String(64), index=True),
db.Column('text_field2', db.String(128), index=True),
db.Column('intfield1', db.Integer()),
db.Column('intfield2', db.Integer()),
db.Column('floatfield', db.Float())
)
metadata.create_all(engine)
kw_dict = df.reset_index().sort_values(by="intfield2", ascending=False).to_dict(orient="records")
batch_size=10000
for batch_start in range(0, len(kw_dict), batch_size):
print("Inserting {}-{}".format(batch_start, batch_start + batch_size))
connection.execute(my_table.insert(), kw_dict[batch_start:batch_start + batch_size])
Мое решение этой проблемы приведено ниже, если это кому-то поможет. Из того, что я читал, метод pandas tosql загружает по одной записи за раз.
Вы можете сделать оператор массовой вставки, который загружает 1000 строк и фиксирует эту транзакцию вместо того, чтобы каждый раз фиксировать одну строку. Это значительно увеличивает скорость.
import pandas as pd
from sqlalchemy import create_engine
import pymssql
import os
connect_string = [your connection string]
engine = create_engine(connect_string,echo=False)
connection = engine.raw_connection()
cursor = connection.cursor()
def load_data(report_name):
# my report_name variable is also my sql server table name so I use that variable to create table name string
sql_table_name = 'AR_'+str(report_name)
global chunk # to QC chunks that fail for some reason
for chunk in pd.read_csv(report_full_path_new,chunksize=1000):
chunk.replace('\'','\'\'',inplace=True,regex=True) #replace single quotes in data with double single quotes to escape it in mysql
chunk.fillna('NULL',inplace=True)
my_data = str(chunk.to_records(index=False).tolist()) # convert data to string
my_data = my_data[1:-1] # clean up the ends
my_data = my_data.replace('\"','\'').replace('\'NULL\'','NULL') #convert blanks to NULLS for mysql
sql_table_name = [your sql server table name]
sql = """
INSERT INTO {0}
VALUES {1}
""".format(sql_table_name,my_data)
cursor.execute(sql)
# you must call commit() to persist your data if you don't set autocommit to True
connection.commit()
На основании этого ответа - Асем.
Вы можете использовать метод copy_from для имитации массовой загрузки с объектом курсора. Это было проверено на Postgres, попробуйте с вашей БД:
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
from StringIO import StringIO
ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"
engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database) #don't forget to add a password if needed
my_data_frame.head(0).to_sql(TableName, engine, if_exists='replace', index=False) # create an empty table - just for structure
conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
my_data_frame.to_csv(output, sep='\t', header=False, index=False) # a CSV that will be used for the bulk load
output.seek(0)
cur.copy_from(output, TableName, null="") # null values become ''
conn.commit()
conn.close()
cur.close()
Как сказано в других ответах, причина замедления и / или тайм-аута заключается в том, что панды снова и снова вставляют много отдельных строк. Большой объем команд вставки является медленным и / или может перегружать целевую базу данных.
using method='multi' указывает пандам загружать кусками. Это намного быстрее и не так легко истечет по тайм-ауту.
sqlEngine=create_engine('mysql+mysqlconnector://'+config['user']+':'+config['pass']+'@'+config['host']+'/'+config['dbname'])
dbConnection=sqlEngine.connect()
df.to_sql('table_name',con=dbConnection,method='multi',if_exists='append',index=False)
dbConnection.close()
Вероятно, ответ pyarrow выше лучше всего, но для mariadb я написал оболочку для DataFrame для использованияexecutemany
иfetchall
, что дало мне ускорение в 300 раз. Это также имело дополнительный бонус в виде полного отсутствия использования sqlalchemy.
Вы можете использовать его как обычно:df.to_sql(...)
, илиdf = read_sql_table(...)
.
См. https://gist.github.com/MichaelCurrie/b5ab978c0c0c1860bb5e75676775b43b .