Как ускорить массовую вставку в MS SQL Server из CSV с помощью pyodbc

Ниже приведен мой код, с которым мне нужна помощь. Мне нужно запустить более 1 300 000 строк, а это значит, что для вставки ~300 000 строк требуется до 40 минут.

Я полагаю, что массовая вставка - это путь, чтобы ускорить его? Или потому что я перебираю строки через for data in reader: часть?

#Opens the prepped csv file
with open (os.path.join(newpath,outfile), 'r') as f:
    #hooks csv reader to file
    reader = csv.reader(f)
    #pulls out the columns (which match the SQL table)
    columns = next(reader)
    #trims any extra spaces
    columns = [x.strip(' ') for x in columns]
    #starts SQL statement
    query = 'bulk insert into SpikeData123({0}) values ({1})'
    #puts column names in SQL query 'query'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))

    print 'Query is: %s' % query
    #starts curser from cnxn (which works)
    cursor = cnxn.cursor()
    #uploads everything by row
    for data in reader:
        cursor.execute(query, data)
        cursor.commit()

Я специально выбираю заголовки своих столбцов (так как я хотел бы создать максимально питонный код).

SpikeData123 - это имя таблицы.

6 ответов

Решение

BULK INSERT почти наверняка будет намного быстрее, чем чтение исходного файла построчно и выполнение обычной INSERT для каждой строки. Тем не менее, как BULK INSERT, так и BCP имеют существенное ограничение в отношении файлов CSV, так как они не могут обрабатывать текстовые квалификаторы (см. Здесь). То есть, если в вашем CSV-файле нет подходящих текстовых строк...

1,Gord Thompson,2015-04-15
2,Bob Loblaw,2015-04-07

... тогда вы можете BULK INSERT, но если он содержит квалификаторы текста (потому что некоторые текстовые значения содержат запятые)...

1,"Thompson, Gord",2015-04-15
2,"Loblaw, Bob",2015-04-07

... тогда BULK INSERT не может справиться с этим. Тем не менее, в целом может быть быстрее предварительно обработать такой CSV-файл в файл с разделителем каналов...

1|Thompson, Gord|2015-04-15
2|Loblaw, Bob|2015-04-07

... или файл с разделителями табуляции (где представляет символ табуляции)...

1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07

... а затем ОБЪЯВИТЬ ВСТАВИТЬ этот файл. Для последнего файла (с разделителями табуляции) код BULK INSERT будет выглядеть примерно так:

import pypyodbc
conn_str = "DSN=myDb_SQLEXPRESS;"
cnxn = pypyodbc.connect(conn_str)
crsr = cnxn.cursor()
sql = """
BULK INSERT myDb.dbo.SpikeData123
FROM 'C:\\__tmp\\biTest.txt' WITH (
    FIELDTERMINATOR='\\t',
    ROWTERMINATOR='\\n'
    );
"""
crsr.execute(sql)
cnxn.commit()
crsr.close()
cnxn.close()

Примечание: как уже упоминалось в комментарии, выполнение BULK INSERT Это утверждение применимо только в том случае, если экземпляр SQL Server может напрямую читать исходный файл. Для случаев, когда исходный файл находится на удаленном клиенте, см. Этот ответ.

Как отмечено в комментарии к другому ответу, T-SQL BULK INSERT Команда будет работать только в том случае, если импортируемый файл находится на том же компьютере, что и экземпляр SQL Server, или в сетевом расположении SMB/CIFS, которое может прочитать экземпляр SQL Server. Таким образом, это может быть неприменимо в случае, когда исходный файл находится на удаленном клиенте.

В pyodbc 4.0.19 добавлена ​​функция Cursor#fast_executemany, которая может быть полезна в этом случае. fast_executemany по умолчанию выключен, и следующий тестовый код...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.time()
crsr.executemany(sql, params)
print(f'{time.time() - t0:.1f} seconds')

... потребовалось примерно 22 секунды для выполнения на моей тестовой машине. Просто добавив crsr.fast_executemany = True...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

crsr.fast_executemany = True  # new in pyodbc 4.0.19

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.time()
crsr.executemany(sql, params)
print(f'{time.time() - t0:.1f} seconds')

... сократил время выполнения до чуть более 1 секунды.

Да массовая вставка - это правильный путь для загрузки больших файлов в БД. С первого взгляда я бы сказал, что причина, по которой это занимает так много времени, заключается в том, что, как вы упомянули, вы циклически просматриваете каждую строку данных в файле, что фактически означает устранение преимуществ использования массовой вставки и превращение ее в обычную вставку. Просто помните, что, как следует из названия, он используется для вставки фрагментов данных. Я бы удалил цикл и попробовал бы снова.

Кроме того, я бы дважды проверил ваш синтаксис для массовой вставки, поскольку он не выглядит правильным для меня. проверить sql, который генерируется pyodbc, так как у меня такое ощущение, что он может выполнять только обычную вставку

Альтернативно, если это все еще медленно, я попытался бы использовать массовую вставку непосредственно из sql и либо загрузить весь файл во временную таблицу с массовой вставкой, а затем вставить соответствующий столбец в нужные таблицы. или используйте сочетание массовой вставки и bcp для вставки определенных столбцов или OPENROWSET.

Эта проблема меня расстраивала, и я не заметил особых улучшений при использовании fast_executemanyпока я не нашел этот пост на SO. В частности, комментарий Брайана Байлиаша относительно max varchar. Я использовал SQLAlchemy, и даже обеспечение более точных параметров типа данных не помогло мне решить проблему; однако переход на pyodbc сделал. Я также последовал совету Майкла Моуры по использованию временного стола и обнаружил, что он сэкономил еще больше времени. Я написал функцию на случай, если она кому-нибудь пригодится. Я написал, чтобы взять либо список, либо список списков для вставки. Мне потребовалась вставка тех же данных с использованием SQLAlchemy и Pandas. to_sqlот 40 минут до чуть менее 4 секунд. Хотя, возможно, я неправильно использовал свой прежний метод.

связь

      def mssql_conn():
    conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}',
                          server=os.environ.get('MS_SQL_SERVER'),
                          database='EHT',
                          uid=os.environ.get('MS_SQL_UN'),
                          pwd=os.environ.get('MS_SQL_PW'),
                          autocommit=True)
    return conn

Вставить функцию

      def mssql_insert(table,val_lst,truncate=False,temp_table=False):
    '''Use as direct connection to database to insert data, especially for
       large inserts. Takes either a single list (for one row),
       or list of list (for multiple rows). Can either append to table
       (default) or if truncate=True, replace existing.'''
    conn = mssql_conn()
    cursor = conn.cursor()
    cursor.fast_executemany = True
    tt = False
    qm = '?,'
    if isinstance(val_lst[0],list):
        rows = len(val_lst)
        params = qm * len(val_lst[0])
    else:
        rows = 1
        params = qm * len(val_lst)
        val_lst = [val_lst]
    params = params[:-1]
    if truncate:
        cursor.execute(f"TRUNCATE TABLE {table}")
    if temp_table:
        #create a temp table with same schema
        start_time = time.time()
        cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0")
        table = f"##{table}"
        #set flag to indicate temp table was used
        tt = True
    else:
        start_time = time.time()
    #insert into either existing table or newly created temp table
    stmt = f"INSERT INTO {table} VALUES ({params})"
    cursor.executemany(stmt,val_lst)
    if tt:
        #remove temp moniker and insert from temp table
        dest_table = table[2:]
        cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}")
        print('Temp table used!')
        print(f'{rows} rows inserted into the {dest_table} table in {time.time() - 
              start_time} seconds')
    else:
        print('No temp table used!')
        print(f'{rows} rows inserted into the {table} table in {time.time() - 
              start_time} seconds')
    cursor.close()
    conn.close()

И мои результаты в консоли сначала используют временную таблицу, а затем не используют ее (в обоих случаях таблица содержала данные во время выполнения и Truncate=True):

      No temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343 
seconds

Temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787 
seconds

FWIW, я дал несколько методов вставки в SQL Server для собственного тестирования. На самом деле я смог получить самые быстрые результаты, используя пакеты SQL Server и инструкции pyodbcCursor.execute. Сохранение в csv и BULK INSERT я не тестировал, интересно, как это сравнивается.

Вот мой блог о проведенном мной тестировании: http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html

добавление к ответу Горда Томпсона:

      # add the below line for controlling batch size of insert
cursor.fast_executemany_rows = batch_size # by default it is 1000
Другие вопросы по тегам