Почему в python map() и multiprocessing.Pool.map() получили разные ответы?
У меня была странная проблема. У меня есть файл в формате:
START
1
2
STOP
lllllllll
START
3
5
6
STOP
и я хочу прочитать строки между START
а также STOP
в качестве блоков, и использовать my_f
обрабатывать каждый блок.
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=itertools.takewhile(lambda x:x!='STOP',lines)
yield block
и в моей основной функции я пытался использовать map()
чтобы сделать работу. Это сработало.
blocks=block_generator(file)
map(my_f,blocks)
на самом деле даст мне то, что я хочу. Но когда я попробовал то же самое с multiprocessing.Pool.map()
, он дал мне ошибку, сказал, что takewhile() хотел принять 2 аргумента, было дано 0.
blocks=block_generator(file)
p=multiprocessing.Pool(4)
p.map(my_f,blocks)
Это ошибка?
- Файл содержит более 1000000 блоков, каждый из которых содержит менее 100 строк.
- Я принимаю форму ответа Untubu.
- Но, возможно, я просто разделю файл и использую n экземпляра моего оригинального скрипта без многопроцессорной обработки для их обработки, а затем сопоставлю результаты. Таким образом, вы никогда не ошибетесь, если скрипт работает с небольшим файлом.
2 ответа
Как насчет:
import itertools
def grouper(n, iterable, fillvalue=None):
# Source: http://docs.python.org/library/itertools.html#recipes
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=list(itertools.takewhile(lambda x:x!='STOP',lines))
yield block
blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
p.map(my_f,chunk)
С помощью grouper
ограничит количество файла, используемого p.map
, Таким образом, весь файл не нужно считывать в память (подавать в очередь задач) сразу.
Я утверждаю выше, что когда вы звоните p.map(func,iterator)
, весь итератор немедленно используется для заполнения очереди задач. Затем работники пула получают задания из очереди и одновременно работают над заданиями.
Если вы заглянете в файл pool.py и проследите определения, вы увидите _handle_tasks
поток получает элементы из self._taskqueue
и перечисляет это сразу:
for i, task in enumerate(taskseq):
...
put(task)
Вывод: итератор передан p.map
потребляется сразу. Не нужно ждать завершения одной задачи, прежде чем следующая задача будет получена из очереди.
Как дальнейшее подтверждение, если вы запустите это:
демонстрационный код:
import multiprocessing as mp
import time
import logging
def foo(x):
time.sleep(1)
return x*x
def blocks():
for x in range(1000):
if x%100==0:
logger.info('Got here')
yield x
logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG)
pool=mp.Pool()
print pool.map(foo, blocks())
Вы увидите Got here
сообщение печатается 10 раз почти сразу, а затем длительная пауза из-за time.sleep(1)
вызывать foo
, Это наглядно показывает, что итератор полностью используется задолго до того, как процессы пула приступают к выполнению задач.
По сути, когда вы перебираете файл, как вы, каждый раз, когда вы читаете новую строку из файла, вы перемещаете указатель файла на одну строку вперед.
Итак, когда вы делаете
block=itertools.takewhile(lambda x:x!='STOP',lines)
каждый раз, когда итератор возвращается takewhile
получает новый предмет от lines
, он перемещает указатель файла.
Обычно плохо продвигать итератор, над которым вы уже работаете for
петля. Тем не менее for
цикл временно приостановлен на каждом yield
, а также map
истощает takewhile
прежде чем продолжить for
цикл, так что вы получите желаемое поведение.
Когда у вас есть for
петля и takewhile
При одновременном запуске указатель файла быстро перемещается в конец, и вы получаете ошибку.
Попробуйте вместо этого, это должно быть быстрее, чем упаковка takewhile
в list
:
from contextlib import closing
from itertools import repeat
def block_generator(filename):
with open(filename) as infile:
for pos in (infile.tell() for line in infile if line == 'START'):
yield pos
def my_f_wrapper(pos, filename):
with open(filename) as infile:
infile.seek(pos)
block=itertools.takewhile(lambda x:x!='STOP', infile)
my_f(block)
blocks = block_generator(filename)
p.imap(my_f_wrapper, blocks, repeat(filename))
По сути, вы хотите, чтобы каждый my_f
работать независимо от файла, поэтому вам нужно открыть файл независимо для каждого.
Я не могу придумать способ, который не требует повторения файла дважды, один раз for
цикл и один раз takewhile
Все вместе, в то же время параллельно обрабатывая файл. В вашей оригинальной версии takewhile
продвинул указатель файла для for
петля, так что это было очень эффективно.
Если бы вы перебирали не строки, а только байты, я бы порекомендовал использовать для этого mmap, но это бы усложнило ситуацию, если вы работаете со строками текста.
Изменить: альтернатива будет иметь block_generator
просмотреть файл и найти все позиции START
а также STOP
, а затем подайте их попарно на обертку. Таким образом, оболочка не должна будет сравнивать строки с STOP
было бы просто использовать tell()
в файле, чтобы убедиться, что это не было в STOP
, Я не уверен, будет ли это быстрее.