Python Chunking CSV File Мультипроцессорная обработка

Я использую следующий код для разделения файла CSV на несколько кусков ( отсюда)

def worker(chunk):
    print len(chunk)

def keyfunc(row):
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'Counseling.csv'
    num_chunks = 10
    start_time = time.time()
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        reader.next()
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()

Однако, кажется, что количество чанков всегда остается постоянным независимо от количества чанков, которые я выбрал для использования. Например, если я выберу 1 или 10 блоков, я всегда получаю этот вывод при обработке файла примера. В идеале я хотел бы разбить файл на части, чтобы он был равномерно распределен.

Обратите внимание, что настоящий файл, который я разделяю, имеет длину более 13 миллионов строк, поэтому я обрабатываю его по частям. Это обязательно!

6
7
1
...
1
1
94
--- 0.101687192917 seconds ---

2 ответа

Решение

Согласно комментариям, мы хотим, чтобы каждый процесс работал над фрагментом из 10000 строк. Это не так сложно сделать; увидеть iter/islice рецепт ниже. Однако проблема с использованием

pool.map(worker, ten_thousand_row_chunks)

в том, что pool.map попытается поместить все чанки в очередь задач одновременно. Если для этого требуется больше памяти, чем доступно, вы получите MemoryError, (Заметка: pool.imap страдает от той же проблемы.)

Так что вместо этого нам нужно позвонить pool.map итеративно, по кусочкам каждого куска.

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()

каждый chunk будет состоять из chunksize*num_procs строки из файла. Это достаточно данных, чтобы дать всем работникам в пуле что-то для работы, но не слишком большой, чтобы вызвать MemoryError - при условии chunksize не слишком большой.

каждый chunk затем разбивается на куски, причем каждый кусок состоит из chunksize строки из файла. Эти части затем отправляются pool.map,


Как iter(lambda: list(IT.islice(iterator, chunksize)), []) работа:

Это идиома для группировки итератора в куски по длине. Давайте посмотрим, как это работает на примере:

In [111]: iterator = iter(range(10))

Обратите внимание, что каждый раз IT.islice(iterator, 3) вызывается, новый кусок из 3 элементов обрезается от итератора:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]

Когда в итераторе осталось менее 3 элементов, возвращается только то, что осталось:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]

И если вы позвоните еще раз, вы получите пустой список:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []

lambda: list(IT.islice(iterator, chunksize)) это функция, которая возвращает list(IT.islice(iterator, chunksize)) когда звонили. Это "одна строка", которая эквивалентна

def func():
    return  list(IT.islice(iterator, chunksize))

В заключение, iter(callable, sentinel) возвращает другого итератора. Значения, полученные этим итератором, являются значениями, возвращаемыми вызываемым объектом. Он продолжает давать значения, пока вызываемый объект не вернет значение, равное часовому. Так

iter(lambda: list(IT.islice(iterator, chunksize)), [])

будет продолжать возвращать значения list(IT.islice(iterator, chunksize)) пока это значение не будет пустым списком:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

Прежде всего itertools.groupby не будет иметь никакого реального смысла, если записи еще не отсортированы по ключевому столбцу. Более того, если вам нужно просто разбить файл CSV на заранее определенное количество строк и передать его работнику, вам не нужно делать все это.

Простая реализация будет:

import csv
from multiprocessing import Pool


def worker(chunk):
    print len(chunk)

def emit_chunks(chunk_size, file_path):
    lines_count = 0
    with open(file_path) as f:
        reader = csv.reader(f)
        chunk = []
        for line in reader:
            lines_count += 1
            chunk.append(line)
            if lines_count == chunk_size:
                lines_count = 0
                yield chunk
                chunk = []
            else:
                continue
        if chunk : yield chunk

def main():
    chunk_size = 10
    gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
    p = Pool(5)
    p.imap(worker, gen)
    print 'Completed..'

* Изменить: изменено на pool.imap вместо pool.map

Другие вопросы по тегам