Ошибки Pandas при записи кусков в базу данных с помощью df.to_sql()

Существующая база данных и желаемый результат:

У меня есть большая база данных SQLite (12 ГБ, таблицы с 44 миллионами + строк), которую я хотел бы изменить с помощью Pandas в Python3.

Пример Цель: я надеюсь прочитать одну из этих больших таблиц (44 миллиона строк) в DF по частям, манипулировать частью DF и записать результат в новую таблицу. Если возможно, я бы хотел заменить новую таблицу, если она существует, и добавить к ней каждый кусок.

Поскольку мои манипуляции только добавляют или изменяют столбцы, в новой таблице должно быть столько же строк, сколько в исходной таблице.

Вопросы:

Основная проблема, кажется, проистекает из следующей строки в приведенном ниже коде:

df.to_sql(new_table, con=db, if_exists = "append", index=False)

  1. Когда эта строка запускается в приведенном ниже коде, я, похоже, последовательно получаю дополнительный кусок размера =N плюс одно наблюдение, чем я ожидал.
  2. При первом запуске этого кода с новым именем таблицы я получаю сообщение об ошибке:
 Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
  1. Если я затем перезапущу сценарий с тем же новым именем таблицы, он будет выполняться для каждого чанка и дополнительного чанка +1 строки.

  2. Когда df.to_sql() строка закомментирована, цикл выполняется для ожидаемого количества фрагментов.

Тестовый пример проблемы с полным кодом:

Полный код: example.py

import pandas as pd
import sqlite3

#Helper Functions Used in Example
def ren(invar, outvar, df):
    df.rename(columns={invar:outvar}, inplace=True)
    return(df)

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
new_table = "new_table"

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from test_table limit 10000;", con=db, chunksize = 5000)

for df in df_generator:
    #Functions to modify data, example
    df = ren("name", "renamed_name", df)
    print(df.shape)
    df.to_sql(new_table, con=db, if_exists = "append", index=False)


#Count if new table is created
try:
    count_result(c, new_table)
except:
    pass

1. Результат, когда #df.to_sql(new_table, con=db, if_exists = "append", index=False)

(проблемная строка закомментирована):

$ python3 example.py 
(5000, 22)
(5000, 22)

Что я и ожидал, так как пример кода ограничивает мою большую таблицу 10k строк.

2. Результат, когда df.to_sql(new_table, con=db, if_exists = "append", index=False)

а. проблемная линия не закомментирована

б. это первый раз, когда код запускается с new_table:

$ python3 example.py 
(5000, 22)
Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database

3. Результат, когда df.to_sql(new_table, con=db, if_exists = "append", index=False)

а. проблемная линия не закомментирована

б. приведенный выше код запускается второй раз с new_table:

$ python3 example.py 
(5000, 22)
(5000, 22)
(5000, 22)
(1, 22)
[*] total: 20,001 rows in new_table table

Таким образом, у меня возникает проблема с первым нарушением кода при первом запуске (Результат 2), а во-вторых, общее количество строк при втором запуске (Результат 3) более чем вдвое превышает ожидаемый.

Будем очень благодарны за любые предложения о том, как я могу решить эту проблему.

3 ответа

Решение

Вы можете попытаться указать:

db = sqlite3.connect("test.db", isolation_level=None)
#  ---->                        ^^^^^^^^^^^^^^^^^^^^

Кроме того, вы можете попытаться увеличить размер фрагмента, потому что в противном случае время между коммитами будет слишком коротким для БД SQLite - это вызывает эту ошибку, я думаю... Я бы также рекомендовал использовать PostgreSQL, MySQL/MariaDB или что-то подобное - они намного надежнее и подходят для такого размера БД...

Задержка во времени над решением

@MaxU добавляет решение isolation_level=None к базе данных подключение короткое и приятное. Однако по какой-либо причине он значительно замедлил запись / фиксацию каждого куска в базе данных. Например, когда я тестировал решение на таблице из 12 миллионов строк, выполнение кода заняло более 6 часов. И наоборот, создание исходной таблицы из нескольких текстовых файлов заняло несколько минут.

Это понимание привело к более быстрому, но менее изящному решению, которое заняло менее 7 минут, чтобы составить таблицу из 12 миллионов строк по сравнению с более чем 6 часами. Выходные строки соответствовали входным, решая проблему в моем первоначальном вопросе.

Быстрее, но менее элегантное решение

С момента создания исходной таблицы из текстовых файлов / CSV-файлов и использования сценариев SQL для загрузки данных я совместил этот подход с возможностями блоков Panda. Основные основные шаги заключаются в следующем:

  1. Подключиться к БД
  2. Используйте сценарий SQL для создания новой таблицы (столбцы и порядок должны соответствовать тому, что вы делаете с pandas df)
  3. Читать массивный стол кусками
  4. Для каждого чанка измените df по желанию, запишите в csv, загрузите csv с помощью sql и передайте изменения.

Основной код решения:

import pandas as pd
import sqlite3

#Note I Used Functions I Wrote in build_db.py
#(shown below after example solution)
from build_db import *


#Helper Functions Used in Example
def lower_var(var, df):
    s = df[var].str.lower()
    df = df.drop(var, axis=1)
    df = pd.concat([df, s], axis=1)
    return(df)


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()

#create statement
create_table(c, "create_test.sql", path='sql_clean/')

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from example_table;", con=db, chunksize = 100000)

for df in df_generator:
    #functions to modify data, example
    df = lower_var("name", df) #changes column order

    #restore df to column order in sql table
    db_order = ["cmte_id", "amndt_ind", "rpt_tp", "transaction_pgi", "image_num", "transaction_tp", \
        "entity_tp", "name", "city", "state", "zip_code", "employer", "occupation", "transaction_dt", \
        "transaction_amt", "other_id", "tran_id", "file_num", "memo_cd", "memo_text", "sub_id"]
    df = df[db_order]

    #write chunk to csv
    file = "df_chunk.csv"
    df.to_csv(file, sep='|', header=None, index=False)

    #insert chunk csv to db
    insert_file_into_table(c, "insert_test.sql", file, '|', path='sql_clean/')
    db.commit()


#Count results
count_result(c, "test_indiv")

Импортированные пользовательские функции для кода выше

#Relavant Functions in build_db.py

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])

def create_table(cursor, sql_script, path='sql/'):
    print("[*] create table with {}{}".format(path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    cursor.executescript(qry)


def insert_file_into_table(cursor, sql_script, file, sep=',', path='sql/'):
    print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    fileObj = open(file, 'rU', encoding='latin-1')
    csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')

    try:
        for row in csvReader:
            try:
                cursor.execute(qry, row)
            except sqlite3.IntegrityError as e:
                pass

    except Exception as e:
        print("[*] error while processing file: {}, error code: {}".format(file, e))
        print("[*] sed replacing null bytes in file: {}".format(file))
        sed_replace_null(file, "clean_null.sh")
        subprocess.call("bash clean_null.sh", shell=True)

        try:
            print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
            fileObj = open(file, 'rU', encoding='latin-1')
            csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
            for row in csvReader:
                try:
                    cursor.execute(qry, row)
                except sqlite3.IntegrityError as e:
                    pass
                    print(e)    

        except Exception as e:
            print("[*] error while processing file: {}, error code: {}".format(file, e))

Пользовательские сценарии SQL

--create_test.sql

DROP TABLE if exists test_indiv;

CREATE TABLE test_indiv (
    cmte_id TEXT NOT NULL,
    amndt_ind TEXT,
    rpt_tp TEXT,
    transaction_pgi TEXT,
    image_num TEXT,
    transaction_tp TEXT,
    entity_tp TEXT,
    name TEXT,
    city TEXT,
    state TEXT,
    zip_code TEXT,
    employer TEXT,
    occupation TEXT,
    transaction_dt TEXT,
    transaction_amt TEXT,
    other_id TEXT,
    tran_id TEXT,
    file_num NUMERIC,
    memo_cd TEXT,
    memo_text TEXT,
    sub_id NUMERIC NOT NULL
);

CREATE UNIQUE INDEX idx_test_indiv ON test_indiv (sub_id);
--insert_test.sql

INSERT INTO test_indiv (
    cmte_id,
    amndt_ind,
    rpt_tp,
    transaction_pgi,
    image_num,
    transaction_tp,
    entity_tp,
    name,
    city,
    state,
    zip_code,
    employer,
    occupation,
    transaction_dt,
    transaction_amt,
    other_id,
    tran_id,
    file_num,
    memo_cd,
    memo_text,
    sub_id
    ) 
VALUES (
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?
);

Испытал точно такую ​​же проблему (имел дело с> 30 ГБ данными). Вот как я решил эту проблему: вместо использования функции Chunk в read_sql. Я решил создать ручной петлитель чанка следующим образом:

chunksize=chunk_size
offset=0
for _ in range(0, a_big_number):
    query = "SELECT * FROM the_table %s offset %s" %(chunksize, offset)
    df = pd.read_sql(query, conn)
    if len(df)!=0:
        ....
    else:
        break
Другие вопросы по тегам