Куски данных из большого файла для многопроцессорной обработки?
Я пытаюсь распараллелить приложение, используя многопроцессорную обработку, которая принимает очень большой CSV-файл (от 64 до 500 МБ), выполняет некоторую работу построчно, а затем выводит небольшой файл фиксированного размера.
В настоящее время я делаю list(file_obj)
, который, к сожалению, полностью загружается в память (я думаю), и затем я разбиваю этот список на n частей, n - количество процессов, которые я хочу запустить. Я тогда делаю pool.map()
в разбитых списках.
Похоже, что это действительно очень плохое время выполнения по сравнению с однопоточной методологией "просто открой файл и итерируй над ней". Может кто-нибудь предложить лучшее решение?
Кроме того, мне нужно обработать строки файла в группах, которые сохраняют значение определенного столбца. Эти группы строк сами могут быть разделены, но ни одна группа не должна содержать более одного значения для этого столбца.
2 ответа
list(file_obj)
может потребовать много памяти, когда fileobj
большой. Мы можем уменьшить эту потребность в памяти, используя itertools для извлечения фрагментов строк по мере необходимости.
В частности, мы можем использовать
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
разбить файл на обрабатываемые куски и
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
чтобы многопроцессорный пул работал на num_chunks
куски за раз.
Таким образом, нам нужно примерно только достаточно памяти, чтобы вместить несколько (num_chunks
) куски в памяти, а не весь файл.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
Я бы сделал это просто. Пусть одна программа откроет файл и прочитает его построчно. Вы можете выбрать, на сколько файлов разбить его, открыть столько выходных файлов и каждую строку записать в следующий файл. Это разделит файл на n равных частей. Затем вы можете запустить программу Python для каждого из файлов параллельно.