Как ускорить обработку данных много в Python + Pandas + sqlAlchemy + MSSQL/T-SQL?

(NB. Я не считаю себя dev, dba или чем-то в этом роде, я учусь на практике - в целом (старый) опыт программирования в целом, но python и sql очень новы для меня)

Я видел этот вопрос, но удивляюсь, изменились ли сейчас вещи через 2 года.

У меня есть база данных MSSQL ~20M строк, ~10 столбцов (но я буду добавлять дополнительные столбцы).

Исходные данные были импортированы из CSV с пустыми значениями, где между строками не было изменений, поэтому я просто использовал pandas dataframe для чтения данных, их заполнения и обратной записи. Я использую dataframe.to_sql для записи во временную таблицу (заменив, если она существует каждый раз), а затем использую SQL для обновления основной таблицы. Обновление 100 тыс. Строк из временной таблицы занимает <1 с, но dataframe.to_sql занимает около 3 минут для 100 тыс. Строк.

Это займет около 8 часов, чтобы обработать весь дБ таким образом, и пока это нормально (я учусь разбираться с фреймами данных, sql и т. Д.), Мне нужно ускорить как минимум на два порядка.

Известные проблемы со скоростью:

  1. T-SQL ограничивает вставки таблицы до 1000 строк
  2. dataframe.to_sql должен иметь небольшой размер чанка (в настоящее время 100), если я хочу избежать ошибок pyodbc (включая отрицательное количество параметров!)

Одним из обходных путей может быть использование dataframe.to_csv с режимом добавления, а затем повторный импорт данных, для которых были заполнены все nas, но это должно быть последним средством. (Это, безусловно, выполнимо: исходный CSV-файл имеет размер 800 МБ, и к тому времени, когда я заполню пробелы, он будет примерно на 20% больше)

Теперь я должен сделать много "обработки сигналов" данных как временных рядов, для которых Панды казались идеальными; с другой стороны, у меня также есть Mathematica, и я могу там поработать, но я бы хотел иметь простой стек технологий, если это возможно.

Итак, учитывая, что мне нужно будет сделать несколько проходов через всю базу данных, обновив его результатами, вопрос:

Есть ли надежда получить желаемое>100-кратное ускорение одним или несколькими методами в сочетании с Python + Pandas+ sqlAlchemy с бэкэндом MSSQL... Другие инструменты Python?

(Одна идея: пойти по промежуточному маршруту CSV и использовать sqlAlchemy для вызова хранимой процедуры для импорта нового CSV, за исключением того, что я понятия не имею, как это сделать.

NB. ПК с процессором i8700k + 32 ГБ и графическим процессором - мне также лучше использовать мое оборудование? Могу ли я распараллелить?)

Все предложения с благодарностью получены!

Далее следует хакерский код, показывающий, что / как я это делаю (соберите из пары файлов... извинения за любые ошибки / упущения)

import sqlalchemy as sqla
from sqlalchemy import func, select
import datetime as dt
import pyodbc
import pandas as pd
import numpy as np

def namedDbSqlAEngineCreate(dbName):
    # Create an engine and switch to the named db
    # returns the engine if successful and None if not
    engineStr = 'mssql+pyodbc://@' + defaultDSN
    engine = sqla.create_engine(engineStr, echo=False)
    try:
        engine.execute('USE ' +dbName)
        return(engine)
    except sqla.exc.SQLAlchemyError as ex:
        if ex.orig.args[0] == '08004':
            print('namedDbSqlAEngineCreate:Database %s does not exist' % dbName)
        else:
            print(ex.args[0])
        return(None)

def Main():
    amDebugging = False
    debugRows = 100

    dbName = 'USDCHF_Raw'
    srcCurrencyTableName = "USDCHF Transformed"
    engine = ctu.namedDbSqlAEngineCreate(dbName) # won't work unless the db Exists!
    if engine != None:
        session = sqla.orm.sessionmaker() # do I need a session in this code??
        session.configure(bind = engine)
        meta = sqla.MetaData(engine)
        meta.reflect(bind=engine) # get info about the database from the database, then we can create a table object directly
        srcCurrencyTable = meta.tables[srcCurrencyTableName]
        connection = engine.connect()
        rowCount = connection.scalar(select([func.count('*')]).select_from(srcCurrencyTable)) # now I'm referring to the table object directly
    else:
        return(None)
    chunkStart = 100000
    chunkRows = max(100000,2) # start small, but must be at least 2 :)
    chunkEnd = min(chunkStart+chunkRows -1, rowCount)
    tempTableName = 'tempNasFilledTable'

    # and the SQL to copy from a temp table; only City and Region columns need to be updated...    
    updateSql = ' UPDATE [' + srcCurrencyTableName + ']' + \
                ' SET ' + \
                ' [City] = [' + tempTableName + '].[City],' + \
                ' [Region] = [' + tempTableName + '].[Region]' + \
                ' FROM [' + tempTableName + ']' +\
                ' WHERE [' + srcCurrencyTableName + '].[Rownumber] = [' + tempTableName + '].[Rownumber]'

    engine.execute(' USE ' + dbName) # without this and the con=engine.engine into_sql the table either does hget created or appears in the Master db!
    print('Fill NA to database starting at ' + str(dt.datetime.now()))
    while True:
        # Prepare a selection; use a sqlAlchemy selectable rather than write SQL as a string...
        s = select([srcCurrencyTable]) # selecting the table
        s = s.where(srcCurrencyTable.c.Rownumber>=chunkStart) # chaining .where().where(), or use sqla.and_()
        s = s.where(srcCurrencyTable.c.Rownumber<=chunkEnd) # adaptation to the table length is now in the chunkEnd update
        print('Dataframe load starting at ' +  str(dt.datetime.now()))
        nasToFillDF = pd.read_sql_query(s,connection)
        print('Dataframe load finished at ' +  str(dt.datetime.now()))
        if amDebugging:
            print(nasToFillDF.head(debugRows))
        if nasToFillDF.empty == True: # hopefully I manage to read exactly to the end of the database, so this *shouldn't* happen...
            break
        else:
            print('FillNa starting at ' +  str(dt.datetime.now()))
            nasToFillDF.fillna(inplace=True,method='ffill') # must do it "inplace" - without it there's filling only if assigned to a new object
            print('FillNa finished at ' +  str(dt.datetime.now()))
            print('tempTable write starting at ' +  str(dt.datetime.now()))
            if amDebugging:
                print(nasToFillDF.head(debugRows))
            try:
                # updating from a dataframe directly doesn't seem to be possible/easy - 
                # This link suggests using a temp table https://stackru.com/questions/45630095/how-to-update-a-db-table-from-pandas-dataset-with-sqlalchemy
                # for some strange reason I need both engine.execute to use the right db and engine.engine in to_sql
                # the to_sql works with small tables but fails when chunkSize ~10000 - see this issue https://github.com/mkleehammer/pyodbc/issues/250
                # There's a limit on the maximum number of rows per insert to MSSQL of 1000 and to_sql seems to have a chunksize limit of 999/(cols +1)
                # workaround is to re export to CSV (!) with a file opened for append see https://stackru.com/questions/17134942/pandas-dataframe-output-end-of-csv
                # and see also https://stackru.com/questions/1466000/python-open-built-in-function-difference-between-modes-a-a-w-w-and-r for python modes
                # https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
                # to use in the to_csv mode parameter

                nasToFillDF.to_sql(name=tempTableName, con=engine.engine, if_exists='replace', chunksize=100, index=False) # create/replace the temp table
                print('tempTable write finished at ' +  str(dt.datetime.now()))
                print('Main table update starting at ' +  str(dt.datetime.now()))
                try:
                    connection.execute(updateSql)
                    print('Main table update finished at ' +  str(dt.datetime.now()))
                    # ok, now advance by chunkRows -1
                    chunkStart = min(chunkStart + chunkRows-1, rowCount)
                    chunkEnd = min(chunkStart+chunkRows-1, rowCount)
                    if chunkStart ==rowCount:
                        break
                    if chunkStart==chunkEnd:
                        chunkStart = chunkEnd-1
                except sqla.exc.SQLAlchemyError as serr:
                    print(serr.args[0])
                    break
            except sqla.exc.SQLAlchemyError as serr:
                print(serr.args[0])
                break
            print('Processed to row ' + str(chunkEnd) + ' at ' + str(dt.datetime.now()))
    print('Done at ' + str(dt.datetime.now()))

0 ответов

Другие вопросы по тегам