Разделение потока CSV на два потока по индексу столбца
Я работаю над проектом, который делает следующее:
- Расшифровывает большой поток исходного файла, содержащий строковые данные CSV
- Разбивает данные на два потока на основе номеров столбцов CSV
- Копирует оба потока в postgres через psycopg
copy_from
CMD
Исходный поток достаточно велик, поэтому важно, чтобы память управлялась соответствующим образом во всех потоках.
Я застрял в двух основных областях:
1) Обеспечение set_one_stream
а также set_two_stream
иметь данные, когда копирование cmd читает из потоков без необходимости загружать все содержимое в память.
2) Как лучше всего разделить исходный поток на два набора.
Вот грубая нерабочая версия того, что я пытаюсь сделать:
import gnupg
import psycopg2
from StringIO import StringIO
DELIMITER = '|'
SOURCE_FILE = open('/large.csv.gpg', 'r')
TABLE_ONE = 'set_one'
TABLE_TWO = 'set_two'
SET_ONE_END_COL = 5
SET_TWO_BEGIN_COL = 5
gpg_buffer = StringIO()
set_one_stream = StringIO()
set_two_stream = StringIO()
def write_gpg_data(chunk):
# note: gpg_buffer referenced before assignment,
# even if `global gpg_buffer`
# look for newline in the csv. if exists, read from gpg_buffer
# and concat chunk to build full row
if '\n' in chunk:
index = chunk.index['\n']
gpg_buffer.write(chunk[0:index])
csv_row = self._buffer.getvalue()
csv_reader = csv.reader(csv_row, delimiter=DELIMITER)
# there must be a better way to split each row into two streams?
for row in csv_reader:
set_one_row = '|'.join(row[0:SET_ONE_END_COL]) + '\n'
set_two_row = '|'.join(row[SET_TWO_BEGIN_COL:]) + '\n'
set_one_stream.write(set_one_row)
set_two_stream.write(set_two_row)
gpg_buffer.close()
gpg_buffer = StringIO()
else:
# write partial row until next chunk
gpg_buffer.write(chunk)
gpg_client = gnupg.GPG()
gpg_client.on_data = write_gpg_data
gpg_client.decrypt_file(SOURCE_FILE)
sql_connection = psycopg2.connect(database='csv_test')
with sql_connection as conn:
with conn.cursor() as curs:
# fyi, copy_from requires `read` and `readline` be defined
# on the source streams
curs.copy_from(set_one_stream, TABLE_ONE, delimiter=DELIMITER, null='')
curs.copy_from(set_two_stream, TABLE_TWO, delimiter=DELIMITER, null='')
Я думаю, что мне могут понадобиться потоки и блокировки, чтобы как-то координировать запись и чтение данных, но я не могу придумать мысленную модель того, как это будет работать.
Любые мысли о том, как подойти к этому, будут высоко оценены.