Используйте двоичную таблицу COPY FROM с psycopg2
У меня есть десятки миллионов строк для передачи из файлов многомерных массивов в базу данных PostgreSQL. Мои инструменты Python и psycopg2. Наиболее эффективный способ сбора данных - это использование copy_from
, Тем не менее, мои данные в основном 32-битные числа с плавающей точкой (действительные или с плавающей точкой4), поэтому я бы не стал преобразовывать их из реального → текст → действительный. Вот пример базы данных DDL:
CREATE TABLE num_data
(
id serial PRIMARY KEY NOT NULL,
node integer NOT NULL,
ts smallint NOT NULL,
val1 real,
val2 double precision
);
Вот где я с Python, используя строки (текст):
# Just one row of data
num_row = [23253, 342, -15.336734, 2494627.949375]
import psycopg2
# Python3:
from io import StringIO
# Python2, use: from cStringIO import StringIO
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# Convert floating point numbers to text, write to COPY input
cpy = StringIO()
cpy.write('\t'.join([repr(x) for x in num_row]) + '\n')
# Insert data; database converts text back to floating point numbers
cpy.seek(0)
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2'))
conn.commit()
Есть ли эквивалент, который может работать в двоичном режиме? Т.е. хранить числа с плавающей запятой в двоичном виде? Это не только сохранит точность с плавающей запятой, но и может быть быстрее.
(Примечание: чтобы увидеть ту же точность, что и в примере, используйте SET extra_float_digits='2'
)
3 ответа
Вот двоичный эквивалент COPY FROM для Python 3:
from io import BytesIO
from struct import pack
import psycopg2
# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
(23256, 348, 43.23524, 2494827.949375)]
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]
# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
('i', 'i', 'h', 'f', 'd'),
( 4, 4, 2, 4, 8 )))
for row in data:
# Number of columns/fields (always 5)
cpy.write(pack('!h', 5))
for col, fmt, size in row_format:
value = (id_seq if col == -1 else row[col])
cpy.write(pack('!i' + fmt, size, value))
id_seq += 1 # manually increment sequence outside of database
# File trailer
cpy.write(pack('!h', -1))
# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)
# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()
Обновить
Я переписал вышеуказанный подход к написанию файлов для COPY. Мои данные в Python находятся в массивах NumPy, поэтому имеет смысл использовать их. Вот пример data
с 1M строк, 7 столбцов:
import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime
conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()
# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))
dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
[('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
prv = nxt
В моей базе данных у меня есть две таблицы, которые выглядят так:
CREATE TABLE num_data_binary
(
id integer PRIMARY KEY,
node integer NOT NULL,
ts smallint NOT NULL,
s0 real,
s1 real,
s2 real,
s3 real,
s4 real,
s5 real,
s6 real
) WITH (OIDS=FALSE);
и еще одна похожая таблица с именем num_data_text
,
Вот несколько простых вспомогательных функций для подготовки данных для COPY (как текстового, так и двоичного форматов) с использованием информации в массиве записей NumPy:
def prepare_text(dat):
cpy = BytesIO()
for row in dat:
cpy.write('\t'.join([repr(x) for x in row]) + '\n')
return(cpy)
def prepare_binary(dat):
pgcopy_dtype = [('num_fields','>i2')]
for field, dtype in dat.dtype.descr:
pgcopy_dtype += [(field + '_length', '>i4'),
(field, dtype.replace('<', '>'))]
pgcopy = np.empty(dat.shape, pgcopy_dtype)
pgcopy['num_fields'] = len(dat.dtype)
for i in range(len(dat.dtype)):
field = dat.dtype.names[i]
pgcopy[field + '_length'] = dat.dtype[i].alignment
pgcopy[field] = dat[field]
cpy = BytesIO()
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
cpy.write(pgcopy.tostring()) # all rows
cpy.write(pack('!h', -1)) # file trailer
return(cpy)
Вот как я использую вспомогательные функции для сравнения двух методов формата COPY:
def time_pgcopy(dat, table, binary):
print('Processing copy object for ' + table)
tstart = datetime.now()
if binary:
cpy = prepare_binary(dat)
else: # text
cpy = prepare_text(dat)
tendw = datetime.now()
print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
str(cpy.tell()) + ' bytes; transfering to database')
cpy.seek(0)
if binary:
curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
else: # text
curs.copy_from(cpy, table)
conn.commit()
tend = datetime.now()
print('Database copy time: ' + str(tend - tendw))
print(' Total time: ' + str(tend - tstart))
return
time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)
Вот вывод из последних двух time_pgcopy
команды:
Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
Total time: 0:00:24.622095
Таким образом, шаги с NumPy → файл и файл → база данных выполняются намного быстрее при использовании бинарного подхода. Очевидное отличие состоит в том, как Python готовит файл COPY, что очень медленно для текста. Вообще говоря, двоичный формат загружается в базу данных в 2/3 времени как текстовый формат для этой схемы.
Наконец, я сравнил значения в обеих таблицах в базе данных, чтобы увидеть, были ли цифры разными. Около 1,46% строк имеют разные значения для столбца s0
и эта доля увеличивается до 6,17% для s6
(вероятно, связано со случайным методом, который я использовал). Ненулевые абсолютные различия между всеми 70M 32-битными значениями с плавающей запятой находятся в диапазоне между 9.3132257e-010 и 7.6293945e-006. Эти небольшие различия между текстовым и двоичным методами загрузки связаны с потерей точности из-за преобразований float → text → float, необходимых для метода текстового формата.
Вот моя версия. По версии Майка.
Это очень специально, но есть два плюса:
- Ожидайте генератор и действует как поток при перегрузке
readline
- Пример как писать в
hstore
двоичный формат
Для тех, кого это еще касается.
Мне пришлось импортировать столбцы с расширением . Я решил это для себя, импортировав данные во временную таблицу, используя
COPY FROM
, или psycopg2
copy_from()
. Затем я сделал
UPDATE ... FROM
из временной таблицы в фактические данные. В
UPDATE
оператор I декодирует байты, представленные в шестнадцатеричном виде, в фактические
bytea
объекты в SQL.
Единственным преимуществом этого подхода по сравнению с двоичным форматом импорта PSQL является то, что он более удобочитаем, когда данные передаются, и, вероятно, его легче кодировать.
Обратите внимание, что psycopg3 имеет более продвинутый
COPY
возможностей, но в настоящее время я использую SQLALchemy 1.4, который называется psycopg2.
Вот мой пример Python:
# Temp table is dropped at the end of the session
# https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-temporary-table/
# This must match data_as_dicts structure.
# We will pass binary data as hex strings in an unprefixed format:
# 01020304
sql = f"""
CREATE TEMP TABLE IF NOT EXISTS {temp_table_name}
(
id int,
sync_event_id int,
sync_reserve0 varchar(128),
sync_reserve1 varchar(128)
);
"""
conn.execute(sql)
# Clean any pending data in the temp table
# between update chunks.
# TODO: Not sure why this does not clear itself at conn.close()
# as I would expect based on the documentation.
sql = f"TRUNCATE {temp_table_name}"
conn.execute(sql)
# Load data from CSV to the temp table
# https://www.psycopg.org/docs/cursor.html
cursor = conn.connection.cursor()
out.seek(0)
cursor.copy_from(out, temp_table_name, sep=delim, columns=columns)
# Fill real table from the temp table
# This copies values from the temp table using
# UPDATE...FROM and matching by the row id.
# We will also reconstruct binary from the hex strings.
sql = f"""
UPDATE {real_table_name}
SET
sync_event_id=b.sync_event_id,
sync_reserve0=decode(b.sync_reserve0, 'hex'),
sync_reserve1=decode(b.sync_reserve1, 'hex')
FROM {temp_table_name} AS b
WHERE {real_table_name}.id=b.id;
"""
res = conn.execute(sql)
logger.debug("Updated %d rows", res.rowcount)