Предоставляют ли многопроцессорные пулы каждому процессу одинаковое количество задач или они назначаются доступными?
Когда ты map
итеративный к multiprocessing.Pool
делятся ли итерации на очередь для каждого процесса в пуле в начале, или есть общая очередь, из которой берется задача, когда процесс освобождается?
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
print moo
pool = multiprocessing.Pool()
pool.map(func=process, iterable=generate_stuff())
pool.close()
Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделено 25 вещей для выполнения, или же 100 компонентов отбираются один за другим процессами, которые ищут что-то, чтобы каждый процесс мог выполнять различное количество элементов, например 30 26, 24, 20.
3 ответа
Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделено 25 вещей для выполнения, или же 100 компонентов отбираются один за другим процессами, которые ищут что-то, чтобы каждый процесс мог выполнять различное количество элементов, например 30 26, 24, 20.
Ну, очевидный ответ - проверить это.
На самом деле, тест может не сказать вам много, потому что задания будут завершены как можно скорее, и вполне возможно, что все закончится равномерно, даже если объединенные процессы получат задания по мере готовности. Но есть простой способ исправить это:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
#print moo
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
Если числа "неровные", вы также знаете, что объединенные процессы должны получать новые задания как готовые. (Я явно установил chunksize
1, чтобы убедиться, что порции не такие большие, что каждый получает только один порцию.)
Когда я запускаю его на 8-ядерном компьютере:
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
Таким образом, похоже, что процессы получают новые рабочие места на лету.
Поскольку вы специально спросили о 4 работниках, я изменил Pool()
в Pool(4)
и получил это:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
Тем не менее, есть еще лучший способ выяснить это, чем путем тестирования: прочитайте источник.
Как вы видете, map
просто звонки map_async
, который создает кучу партий и помещает их в self._taskqueue
объект (Queue.Queue
пример). Если вы будете читать дальше, эта очередь не будет использоваться совместно с другими процессами напрямую, но есть поток менеджера пула, который всякий раз, когда процесс завершает работу и возвращает результат, извлекает следующее задание из очереди и отправляет его обратно процессу.
Это также, как вы можете узнать, для чего используется размер по умолчанию map
, Реализация 2.7, указанная выше, показывает, что это просто len(iterable) / (len(self._pool) * 4)
округленный (немного более многословный, чем во избежание дробной арифметики) - или, другими словами, достаточно большой, чтобы примерно 4 куска на процесс. Но вы действительно не должны полагаться на это; документация неопределенно и косвенно подразумевает, что она будет использовать какую-то эвристику, но не дает никаких гарантий относительно того, что это будет. Так что, если вам действительно нужно "около 4 кусков на процесс", рассчитайте это явно. Более реалистично: если вам когда-нибудь понадобится что-то, кроме значения по умолчанию, вам, вероятно, понадобится значение, специфичное для домена, которое вы собираетесь вычислить (путем расчета, угадывания или профилирования).
http://docs.python.org/2/library/multiprocessing.html
map(func, iterable[, chunksize])
Этот метод разделяет итерируемое на несколько частей, которые он отправляет в пул процессов как отдельные задачи. (Приблизительный) размер этих чанков можно указать, установив для чанксайза положительное целое число.
Я предполагаю, что процесс подхватывает следующий кусок из очереди, когда это делается с предыдущим фрагментом.
По умолчанию chunksize
зависит от длины iterable
и выбирается таким образом, чтобы количество кусков примерно в четыре раза превышало количество процессов. (источник)
Оценить chunksize
используется реализацией Python, не глядя на его multiprocessing
модуль исходного кода, запустите:
#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby
def work(index):
mp.get_logger().info(index)
return index, mp.current_process().name
if __name__ == "__main__":
import logging
import sys
logger = mp.log_to_stderr()
# process cmdline args
try:
sys.argv.remove('--verbose')
except ValueError:
pass # not verbose
else:
logger.setLevel(logging.INFO) # verbose
nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
# choices: 'map', 'imap', 'imap_unordered'
map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}
# estimate chunksize used
max_chunksize = 0
map_func = getattr(mp.Pool(nprocesses), map_name)
for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
key=lambda x: x[0]), # sort by index
key=lambda x: x[1]): # group by process name
max_chunksize = max(max_chunksize, len(list(group)))
print("%s: max_chunksize %d" % (map_name, max_chunksize))
Это показывает, что imap
, imap_unordered
использование chunksize=1
по умолчанию и max_chunksize
за map
зависит от nprocesses
, nitem
(количество кусков на процесс не фиксировано) и max_chunksize
зависит от версии Python. Все *map*
функции учитывают chunksize
параметр, если он указан.
использование
$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]
Чтобы увидеть, как распределяются отдельные рабочие места; уточнить --verbose
параметр.