Python Chunking CSV File Мультипроцессорная обработка
Я использую следующий код для разделения файла CSV на несколько кусков ( отсюда)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
Однако, кажется, что количество чанков всегда остается постоянным независимо от количества чанков, которые я выбрал для использования. Например, если я выберу 1 или 10 блоков, я всегда получаю этот вывод при обработке файла примера. В идеале я хотел бы разбить файл на части, чтобы он был равномерно распределен.
Обратите внимание, что настоящий файл, который я разделяю, имеет длину более 13 миллионов строк, поэтому я обрабатываю его по частям. Это обязательно!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
2 ответа
Согласно комментариям, мы хотим, чтобы каждый процесс работал над фрагментом из 10000 строк. Это не так сложно сделать; увидеть iter/islice
рецепт ниже. Однако проблема с использованием
pool.map(worker, ten_thousand_row_chunks)
в том, что pool.map
попытается поместить все чанки в очередь задач одновременно. Если для этого требуется больше памяти, чем доступно, вы получите MemoryError
, (Заметка: pool.imap
страдает от той же проблемы.)
Так что вместо этого нам нужно позвонить pool.map
итеративно, по кусочкам каждого куска.
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
каждый chunk
будет состоять из chunksize*num_procs
строки из файла. Это достаточно данных, чтобы дать всем работникам в пуле что-то для работы, но не слишком большой, чтобы вызвать MemoryError - при условии chunksize
не слишком большой.
каждый chunk
затем разбивается на куски, причем каждый кусок состоит из chunksize
строки из файла. Эти части затем отправляются pool.map
,
Как iter(lambda: list(IT.islice(iterator, chunksize)), [])
работа:
Это идиома для группировки итератора в куски по длине. Давайте посмотрим, как это работает на примере:
In [111]: iterator = iter(range(10))
Обратите внимание, что каждый раз IT.islice(iterator, 3)
вызывается, новый кусок из 3 элементов обрезается от итератора:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
Когда в итераторе осталось менее 3 элементов, возвращается только то, что осталось:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
И если вы позвоните еще раз, вы получите пустой список:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda: list(IT.islice(iterator, chunksize))
это функция, которая возвращает list(IT.islice(iterator, chunksize))
когда звонили. Это "одна строка", которая эквивалентна
def func():
return list(IT.islice(iterator, chunksize))
В заключение, iter(callable, sentinel)
возвращает другого итератора. Значения, полученные этим итератором, являются значениями, возвращаемыми вызываемым объектом. Он продолжает давать значения, пока вызываемый объект не вернет значение, равное часовому. Так
iter(lambda: list(IT.islice(iterator, chunksize)), [])
будет продолжать возвращать значения list(IT.islice(iterator, chunksize))
пока это значение не будет пустым списком:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Прежде всего itertools.groupby не будет иметь никакого реального смысла, если записи еще не отсортированы по ключевому столбцу. Более того, если вам нужно просто разбить файл CSV на заранее определенное количество строк и передать его работнику, вам не нужно делать все это.
Простая реализация будет:
import csv
from multiprocessing import Pool
def worker(chunk):
print len(chunk)
def emit_chunks(chunk_size, file_path):
lines_count = 0
with open(file_path) as f:
reader = csv.reader(f)
chunk = []
for line in reader:
lines_count += 1
chunk.append(line)
if lines_count == chunk_size:
lines_count = 0
yield chunk
chunk = []
else:
continue
if chunk : yield chunk
def main():
chunk_size = 10
gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
p = Pool(5)
p.imap(worker, gen)
print 'Completed..'
* Изменить: изменено на pool.imap вместо pool.map