Ошибки Pandas при записи кусков в базу данных с помощью df.to_sql()
Существующая база данных и желаемый результат:
У меня есть большая база данных SQLite (12 ГБ, таблицы с 44 миллионами + строк), которую я хотел бы изменить с помощью Pandas в Python3.
Пример Цель: я надеюсь прочитать одну из этих больших таблиц (44 миллиона строк) в DF по частям, манипулировать частью DF и записать результат в новую таблицу. Если возможно, я бы хотел заменить новую таблицу, если она существует, и добавить к ней каждый кусок.
Поскольку мои манипуляции только добавляют или изменяют столбцы, в новой таблице должно быть столько же строк, сколько в исходной таблице.
Вопросы:
Основная проблема, кажется, проистекает из следующей строки в приведенном ниже коде:
df.to_sql(new_table, con=db, if_exists = "append", index=False)
- Когда эта строка запускается в приведенном ниже коде, я, похоже, последовательно получаю дополнительный кусок размера =N плюс одно наблюдение, чем я ожидал.
- При первом запуске этого кода с новым именем таблицы я получаю сообщение об ошибке:
Traceback (most recent call last):
File "example.py", line 23, in <module>
for df in df_generator:
File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
Если я затем перезапущу сценарий с тем же новым именем таблицы, он будет выполняться для каждого чанка и дополнительного чанка +1 строки.
Когда
df.to_sql()
строка закомментирована, цикл выполняется для ожидаемого количества фрагментов.
Тестовый пример проблемы с полным кодом:
Полный код: example.py
import pandas as pd
import sqlite3
#Helper Functions Used in Example
def ren(invar, outvar, df):
df.rename(columns={invar:outvar}, inplace=True)
return(df)
def count_result(c, table):
([print("[*] total: {:,} rows in {} table"
.format(r[0], table))
for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])
#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
new_table = "new_table"
#Load Data in Chunks
df_generator = pd.read_sql_query("select * from test_table limit 10000;", con=db, chunksize = 5000)
for df in df_generator:
#Functions to modify data, example
df = ren("name", "renamed_name", df)
print(df.shape)
df.to_sql(new_table, con=db, if_exists = "append", index=False)
#Count if new table is created
try:
count_result(c, new_table)
except:
pass
1. Результат, когда
#df.to_sql(new_table, con=db, if_exists = "append", index=False)
(проблемная строка закомментирована):
$ python3 example.py
(5000, 22)
(5000, 22)
Что я и ожидал, так как пример кода ограничивает мою большую таблицу 10k строк.
2. Результат, когда
df.to_sql(new_table, con=db, if_exists = "append", index=False)
а. проблемная линия не закомментирована
б. это первый раз, когда код запускается с new_table:
$ python3 example.py
(5000, 22)
Traceback (most recent call last):
File "example.py", line 23, in <module>
for df in df_generator:
File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
3. Результат, когда
df.to_sql(new_table, con=db, if_exists = "append", index=False)
а. проблемная линия не закомментирована
б. приведенный выше код запускается второй раз с new_table:
$ python3 example.py
(5000, 22)
(5000, 22)
(5000, 22)
(1, 22)
[*] total: 20,001 rows in new_table table
Таким образом, у меня возникает проблема с первым нарушением кода при первом запуске (Результат 2), а во-вторых, общее количество строк при втором запуске (Результат 3) более чем вдвое превышает ожидаемый.
Будем очень благодарны за любые предложения о том, как я могу решить эту проблему.
3 ответа
Вы можете попытаться указать:
db = sqlite3.connect("test.db", isolation_level=None)
# ----> ^^^^^^^^^^^^^^^^^^^^
Кроме того, вы можете попытаться увеличить размер фрагмента, потому что в противном случае время между коммитами будет слишком коротким для БД SQLite - это вызывает эту ошибку, я думаю... Я бы также рекомендовал использовать PostgreSQL, MySQL/MariaDB или что-то подобное - они намного надежнее и подходят для такого размера БД...
Задержка во времени над решением
@MaxU добавляет решение isolation_level=None
к базе данных подключение короткое и приятное. Однако по какой-либо причине он значительно замедлил запись / фиксацию каждого куска в базе данных. Например, когда я тестировал решение на таблице из 12 миллионов строк, выполнение кода заняло более 6 часов. И наоборот, создание исходной таблицы из нескольких текстовых файлов заняло несколько минут.
Это понимание привело к более быстрому, но менее изящному решению, которое заняло менее 7 минут, чтобы составить таблицу из 12 миллионов строк по сравнению с более чем 6 часами. Выходные строки соответствовали входным, решая проблему в моем первоначальном вопросе.
Быстрее, но менее элегантное решение
С момента создания исходной таблицы из текстовых файлов / CSV-файлов и использования сценариев SQL для загрузки данных я совместил этот подход с возможностями блоков Panda. Основные основные шаги заключаются в следующем:
- Подключиться к БД
- Используйте сценарий SQL для создания новой таблицы (столбцы и порядок должны соответствовать тому, что вы делаете с pandas df)
- Читать массивный стол кусками
- Для каждого чанка измените df по желанию, запишите в csv, загрузите csv с помощью sql и передайте изменения.
Основной код решения:
import pandas as pd
import sqlite3
#Note I Used Functions I Wrote in build_db.py
#(shown below after example solution)
from build_db import *
#Helper Functions Used in Example
def lower_var(var, df):
s = df[var].str.lower()
df = df.drop(var, axis=1)
df = pd.concat([df, s], axis=1)
return(df)
#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
#create statement
create_table(c, "create_test.sql", path='sql_clean/')
#Load Data in Chunks
df_generator = pd.read_sql_query("select * from example_table;", con=db, chunksize = 100000)
for df in df_generator:
#functions to modify data, example
df = lower_var("name", df) #changes column order
#restore df to column order in sql table
db_order = ["cmte_id", "amndt_ind", "rpt_tp", "transaction_pgi", "image_num", "transaction_tp", \
"entity_tp", "name", "city", "state", "zip_code", "employer", "occupation", "transaction_dt", \
"transaction_amt", "other_id", "tran_id", "file_num", "memo_cd", "memo_text", "sub_id"]
df = df[db_order]
#write chunk to csv
file = "df_chunk.csv"
df.to_csv(file, sep='|', header=None, index=False)
#insert chunk csv to db
insert_file_into_table(c, "insert_test.sql", file, '|', path='sql_clean/')
db.commit()
#Count results
count_result(c, "test_indiv")
Импортированные пользовательские функции для кода выше
#Relavant Functions in build_db.py
def count_result(c, table):
([print("[*] total: {:,} rows in {} table"
.format(r[0], table))
for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])
def create_table(cursor, sql_script, path='sql/'):
print("[*] create table with {}{}".format(path, sql_script))
qry = open("{}{}".format(path, sql_script), 'rU').read()
cursor.executescript(qry)
def insert_file_into_table(cursor, sql_script, file, sep=',', path='sql/'):
print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
qry = open("{}{}".format(path, sql_script), 'rU').read()
fileObj = open(file, 'rU', encoding='latin-1')
csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
try:
for row in csvReader:
try:
cursor.execute(qry, row)
except sqlite3.IntegrityError as e:
pass
except Exception as e:
print("[*] error while processing file: {}, error code: {}".format(file, e))
print("[*] sed replacing null bytes in file: {}".format(file))
sed_replace_null(file, "clean_null.sh")
subprocess.call("bash clean_null.sh", shell=True)
try:
print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
fileObj = open(file, 'rU', encoding='latin-1')
csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
for row in csvReader:
try:
cursor.execute(qry, row)
except sqlite3.IntegrityError as e:
pass
print(e)
except Exception as e:
print("[*] error while processing file: {}, error code: {}".format(file, e))
Пользовательские сценарии SQL
--create_test.sql
DROP TABLE if exists test_indiv;
CREATE TABLE test_indiv (
cmte_id TEXT NOT NULL,
amndt_ind TEXT,
rpt_tp TEXT,
transaction_pgi TEXT,
image_num TEXT,
transaction_tp TEXT,
entity_tp TEXT,
name TEXT,
city TEXT,
state TEXT,
zip_code TEXT,
employer TEXT,
occupation TEXT,
transaction_dt TEXT,
transaction_amt TEXT,
other_id TEXT,
tran_id TEXT,
file_num NUMERIC,
memo_cd TEXT,
memo_text TEXT,
sub_id NUMERIC NOT NULL
);
CREATE UNIQUE INDEX idx_test_indiv ON test_indiv (sub_id);
--insert_test.sql
INSERT INTO test_indiv (
cmte_id,
amndt_ind,
rpt_tp,
transaction_pgi,
image_num,
transaction_tp,
entity_tp,
name,
city,
state,
zip_code,
employer,
occupation,
transaction_dt,
transaction_amt,
other_id,
tran_id,
file_num,
memo_cd,
memo_text,
sub_id
)
VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?
);
Испытал точно такую же проблему (имел дело с> 30 ГБ данными). Вот как я решил эту проблему: вместо использования функции Chunk в read_sql. Я решил создать ручной петлитель чанка следующим образом:
chunksize=chunk_size
offset=0
for _ in range(0, a_big_number):
query = "SELECT * FROM the_table %s offset %s" %(chunksize, offset)
df = pd.read_sql(query, conn)
if len(df)!=0:
....
else:
break