Предоставляют ли многопроцессорные пулы каждому процессу одинаковое количество задач или они назначаются доступными?

Когда ты map итеративный к multiprocessing.Pool делятся ли итерации на очередь для каждого процесса в пуле в начале, или есть общая очередь, из которой берется задача, когда процесс освобождается?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделено 25 вещей для выполнения, или же 100 компонентов отбираются один за другим процессами, которые ищут что-то, чтобы каждый процесс мог выполнять различное количество элементов, например 30 26, 24, 20.

3 ответа

Решение

Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделено 25 вещей для выполнения, или же 100 компонентов отбираются один за другим процессами, которые ищут что-то, чтобы каждый процесс мог выполнять различное количество элементов, например 30 26, 24, 20.

Ну, очевидный ответ - проверить это.

На самом деле, тест может не сказать вам много, потому что задания будут завершены как можно скорее, и вполне возможно, что все закончится равномерно, даже если объединенные процессы получат задания по мере готовности. Но есть простой способ исправить это:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

Если числа "неровные", вы также знаете, что объединенные процессы должны получать новые задания как готовые. (Я явно установил chunksize 1, чтобы убедиться, что порции не такие большие, что каждый получает только один порцию.)

Когда я запускаю его на 8-ядерном компьютере:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

Таким образом, похоже, что процессы получают новые рабочие места на лету.

Поскольку вы специально спросили о 4 работниках, я изменил Pool() в Pool(4) и получил это:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

Тем не менее, есть еще лучший способ выяснить это, чем путем тестирования: прочитайте источник.

Как вы видете, map просто звонки map_async, который создает кучу партий и помещает их в self._taskqueue объект (Queue.Queue пример). Если вы будете читать дальше, эта очередь не будет использоваться совместно с другими процессами напрямую, но есть поток менеджера пула, который всякий раз, когда процесс завершает работу и возвращает результат, извлекает следующее задание из очереди и отправляет его обратно процессу.

Это также, как вы можете узнать, для чего используется размер по умолчанию map, Реализация 2.7, указанная выше, показывает, что это просто len(iterable) / (len(self._pool) * 4) округленный (немного более многословный, чем во избежание дробной арифметики) - или, другими словами, достаточно большой, чтобы примерно 4 куска на процесс. Но вы действительно не должны полагаться на это; документация неопределенно и косвенно подразумевает, что она будет использовать какую-то эвристику, но не дает никаких гарантий относительно того, что это будет. Так что, если вам действительно нужно "около 4 кусков на процесс", рассчитайте это явно. Более реалистично: если вам когда-нибудь понадобится что-то, кроме значения по умолчанию, вам, вероятно, понадобится значение, специфичное для домена, которое вы собираетесь вычислить (путем расчета, угадывания или профилирования).

http://docs.python.org/2/library/multiprocessing.html

map(func, iterable[, chunksize])

Этот метод разделяет итерируемое на несколько частей, которые он отправляет в пул процессов как отдельные задачи. (Приблизительный) размер этих чанков можно указать, установив для чанксайза положительное целое число.

Я предполагаю, что процесс подхватывает следующий кусок из очереди, когда это делается с предыдущим фрагментом.

По умолчанию chunksize зависит от длины iterable и выбирается таким образом, чтобы количество кусков примерно в четыре раза превышало количество процессов. (источник)

Оценить chunksize используется реализацией Python, не глядя на его multiprocessing модуль исходного кода, запустите:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

Это показывает, что imap, imap_unordered использование chunksize=1 по умолчанию и max_chunksize за map зависит от nprocesses, nitem (количество кусков на процесс не фиксировано) и max_chunksize зависит от версии Python. Все *map* функции учитывают chunksize параметр, если он указан.

использование

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

Чтобы увидеть, как распределяются отдельные рабочие места; уточнить --verbose параметр.

Другие вопросы по тегам