Почему в python map() и multiprocessing.Pool.map() получили разные ответы?

У меня была странная проблема. У меня есть файл в формате:

START
1
2
STOP
lllllllll
START
3
5
6
STOP

и я хочу прочитать строки между START а также STOP в качестве блоков, и использовать my_f обрабатывать каждый блок.

def block_generator(file):

with open(file) as lines:
    for line in lines:
        if line == 'START': 
            block=itertools.takewhile(lambda x:x!='STOP',lines) 
            yield block   

и в моей основной функции я пытался использовать map() чтобы сделать работу. Это сработало.

blocks=block_generator(file)
map(my_f,blocks)

на самом деле даст мне то, что я хочу. Но когда я попробовал то же самое с multiprocessing.Pool.map(), он дал мне ошибку, сказал, что takewhile() хотел принять 2 аргумента, было дано 0.

    blocks=block_generator(file)
    p=multiprocessing.Pool(4) 
    p.map(my_f,blocks)

Это ошибка?

  1. Файл содержит более 1000000 блоков, каждый из которых содержит менее 100 строк.
  2. Я принимаю форму ответа Untubu.
  3. Но, возможно, я просто разделю файл и использую n экземпляра моего оригинального скрипта без многопроцессорной обработки для их обработки, а затем сопоставлю результаты. Таким образом, вы никогда не ошибетесь, если скрипт работает с небольшим файлом.

2 ответа

Решение

Как насчет:

import itertools

def grouper(n, iterable, fillvalue=None):
    # Source: http://docs.python.org/library/itertools.html#recipes
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)

def block_generator(file):
    with open(file) as lines:
        for line in lines:
            if line == 'START': 
                block=list(itertools.takewhile(lambda x:x!='STOP',lines))
                yield block

blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
    p.map(my_f,chunk)

С помощью grouper ограничит количество файла, используемого p.map, Таким образом, весь файл не нужно считывать в память (подавать в очередь задач) сразу.


Я утверждаю выше, что когда вы звоните p.map(func,iterator), весь итератор немедленно используется для заполнения очереди задач. Затем работники пула получают задания из очереди и одновременно работают над заданиями.

Если вы заглянете в файл pool.py и проследите определения, вы увидите _handle_tasks поток получает элементы из self._taskqueue и перечисляет это сразу:

         for i, task in enumerate(taskseq):
             ...
             put(task)

Вывод: итератор передан p.map потребляется сразу. Не нужно ждать завершения одной задачи, прежде чем следующая задача будет получена из очереди.

Как дальнейшее подтверждение, если вы запустите это:

демонстрационный код:

import multiprocessing as mp
import time
import logging

def foo(x):
    time.sleep(1)
    return x*x

def blocks():
    for x in range(1000):
        if x%100==0:
            logger.info('Got here')
        yield x

logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG) 
pool=mp.Pool() 
print pool.map(foo, blocks()) 

Вы увидите Got here сообщение печатается 10 раз почти сразу, а затем длительная пауза из-за time.sleep(1) вызывать foo, Это наглядно показывает, что итератор полностью используется задолго до того, как процессы пула приступают к выполнению задач.

По сути, когда вы перебираете файл, как вы, каждый раз, когда вы читаете новую строку из файла, вы перемещаете указатель файла на одну строку вперед.

Итак, когда вы делаете

block=itertools.takewhile(lambda x:x!='STOP',lines) 

каждый раз, когда итератор возвращается takewhile получает новый предмет от lines, он перемещает указатель файла.

Обычно плохо продвигать итератор, над которым вы уже работаете for петля. Тем не менее for цикл временно приостановлен на каждом yield, а также map истощает takewhile прежде чем продолжить for цикл, так что вы получите желаемое поведение.

Когда у вас есть for петля и takewhileПри одновременном запуске указатель файла быстро перемещается в конец, и вы получаете ошибку.

Попробуйте вместо этого, это должно быть быстрее, чем упаковка takewhile в list:

from contextlib import closing
from itertools import repeat

def block_generator(filename):
    with open(filename) as infile:
        for pos in (infile.tell() for line in infile if line == 'START'):
            yield pos

def my_f_wrapper(pos, filename):
    with open(filename) as infile:
        infile.seek(pos)
        block=itertools.takewhile(lambda x:x!='STOP', infile)
        my_f(block)

blocks = block_generator(filename)
p.imap(my_f_wrapper, blocks, repeat(filename))

По сути, вы хотите, чтобы каждый my_f работать независимо от файла, поэтому вам нужно открыть файл независимо для каждого.

Я не могу придумать способ, который не требует повторения файла дважды, один раз for цикл и один раз takewhileВсе вместе, в то же время параллельно обрабатывая файл. В вашей оригинальной версии takewhileпродвинул указатель файла для for петля, так что это было очень эффективно.

Если бы вы перебирали не строки, а только байты, я бы порекомендовал использовать для этого mmap, но это бы усложнило ситуацию, если вы работаете со строками текста.

Изменить: альтернатива будет иметь block_generator просмотреть файл и найти все позиции START а также STOP, а затем подайте их попарно на обертку. Таким образом, оболочка не должна будет сравнивать строки с STOPбыло бы просто использовать tell() в файле, чтобы убедиться, что это не было в STOP, Я не уверен, будет ли это быстрее.

Другие вопросы по тегам