Разделение потока CSV на два потока по индексу столбца

Я работаю над проектом, который делает следующее:

  • Расшифровывает большой поток исходного файла, содержащий строковые данные CSV
  • Разбивает данные на два потока на основе номеров столбцов CSV
  • Копирует оба потока в postgres через psycopgcopy_from CMD

Исходный поток достаточно велик, поэтому важно, чтобы память управлялась соответствующим образом во всех потоках.

Я застрял в двух основных областях:

1) Обеспечение set_one_stream а также set_two_stream иметь данные, когда копирование cmd читает из потоков без необходимости загружать все содержимое в память.

2) Как лучше всего разделить исходный поток на два набора.

Вот грубая нерабочая версия того, что я пытаюсь сделать:

import gnupg
import psycopg2
from StringIO import StringIO

DELIMITER = '|'
SOURCE_FILE = open('/large.csv.gpg', 'r')
TABLE_ONE = 'set_one'
TABLE_TWO = 'set_two'
SET_ONE_END_COL = 5
SET_TWO_BEGIN_COL = 5 

gpg_buffer = StringIO()
set_one_stream = StringIO()
set_two_stream = StringIO()

def write_gpg_data(chunk):
    # note: gpg_buffer referenced before assignment,
    # even if `global gpg_buffer`

    # look for newline in the csv. if exists, read from gpg_buffer
    # and concat chunk to build full row
    if '\n' in chunk:
        index = chunk.index['\n']
        gpg_buffer.write(chunk[0:index])
        csv_row = self._buffer.getvalue()
        csv_reader = csv.reader(csv_row, delimiter=DELIMITER)

        # there must be a better way to split each row into two streams?
        for row in csv_reader:
            set_one_row = '|'.join(row[0:SET_ONE_END_COL]) + '\n'
            set_two_row = '|'.join(row[SET_TWO_BEGIN_COL:]) + '\n'

            set_one_stream.write(set_one_row)
            set_two_stream.write(set_two_row)

        gpg_buffer.close()
        gpg_buffer = StringIO()
    else:
        # write partial row until next chunk
        gpg_buffer.write(chunk)

gpg_client = gnupg.GPG()
gpg_client.on_data = write_gpg_data
gpg_client.decrypt_file(SOURCE_FILE)

sql_connection = psycopg2.connect(database='csv_test')

with sql_connection as conn:
    with conn.cursor() as curs:

        # fyi, copy_from requires `read` and `readline` be defined
        # on the source streams
        curs.copy_from(set_one_stream, TABLE_ONE, delimiter=DELIMITER, null='')
        curs.copy_from(set_two_stream, TABLE_TWO, delimiter=DELIMITER, null='')

Я думаю, что мне могут понадобиться потоки и блокировки, чтобы как-то координировать запись и чтение данных, но я не могу придумать мысленную модель того, как это будет работать.

Любые мысли о том, как подойти к этому, будут высоко оценены.

0 ответов

Другие вопросы по тегам