python concurrent.futures.ProcessPoolExecutor: Производительность.submit() против.map()

Я использую concurrent.futures.ProcessPoolExecutor, чтобы найти вхождение числа из диапазона номеров. Цель состоит в том, чтобы исследовать количество ускоренной производительности, полученной от параллелизма. Для оценки производительности у меня есть элемент управления - последовательный код для выполнения указанной задачи (показано ниже). Я написал 2 одновременных кода, один из которых использует concurrent.futures.ProcessPoolExecutor.submit() а другой использует concurrent.futures.ProcessPoolExecutor.map() выполнить ту же задачу. Они показаны ниже. Советы по составлению первого и второго можно увидеть здесь и здесь, соответственно.

Задача, выданная всем трем кодам, заключалась в том, чтобы найти число вхождений числа 5 в диапазоне чисел от 0 до 1E8. И то и другое .submit() а также .map() были назначены 6 работников, и .map() имел размер 10000. Способ одновременной дискретизации рабочей нагрузки был идентичен в параллельных кодах. Однако функция, используемая для поиска вхождений в обоих кодах, была разной. Это произошло потому, что способы передачи аргументов в функции, вызываемые.submit() и.map(), были разными.

Все 3 кода сообщили об одном и том же количестве случаев, т.е. 56 953 279 раз. Однако время, затраченное на выполнение задачи, было совсем другим. .submit() выполняется в 2 раза быстрее, чем контроль .map() потребовалось вдвое больше времени, чтобы контроль завершил свою задачу.

Вопросы:

  1. Я хотел бы знать, если медленная производительность .map() Является ли артефакт моего кодирования или он по сути медленный?"Если первый, как я могу улучшить его. Я просто удивлен, что он работал медленнее, чем контроль, так как не будет особого стимула для его использования.
  2. Я хотел бы знать, если есть в любом случае, чтобы сделать .submit() код выполняется еще быстрее У меня есть условие, что функция _concurrent_submit() должен вернуть итерируемое число / число, содержащее число 5.

Результаты тестов
результаты тестов

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

Серийный номер:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found {0} in {1:.4f}sec".format(len(a),end))

Обновление 13 февраля 2017 г.:

В дополнение к ответу @niemmi, я предоставил ответ после некоторых личных исследований, чтобы показать:

  1. как ускорить работу решений @map () и.submit() @ niemmi, и
  2. когда ProcessPoolExecutor.map() может привести к большей скорости, чем ProcessPoolExecutor.submit().

2 ответа

Решение

Обзор:

На мой ответ есть 2 части:

  • Часть 1 показывает, как добиться большего ускорения от @niemmi ProcessPoolExecutor.map() решение.
  • Часть 2 показывает, когда ProcessPoolExecutor подклассы .submit() а также .map() дать неэквивалентное время вычислений.

================================================== =====================

Часть 1: Больше ускорения для ProcessPoolExecutor.map()

Фон: этот раздел основан на @niemmi's .map() решение, которое само по себе отлично. Проводя некоторые исследования его схемы дискретизации, чтобы лучше понять, как это взаимодействует с аргументом.map (), я нашел это интересное решение.

Я рассматриваю определение @niemmi chunk = nmax // workers быть определением размера фрагмента, то есть меньшего размера фактического диапазона номеров (заданной задачи), который должен обрабатываться каждым рабочим в пуле рабочих. Теперь это определение основано на предположении, что если на компьютере имеется x количество работников, распределение задачи поровну между каждым работником приведет к оптимальному использованию каждого работника и, следовательно, общая задача будет выполнена быстрее всего. Следовательно, количество фрагментов, на которые нужно разбить задание, должно всегда равняться количеству работников пула. Однако верно ли это предположение?

Предложение: здесь я предлагаю, чтобы вышеупомянутое предположение не всегда приводило к быстрейшему времени вычисления при использовании с ProcessPoolExecutor.map(), Скорее, дискретизация задачи до суммы, превышающей количество работников пула, может привести к ускорению, то есть более быстрому завершению данной задачи.

Эксперимент: я изменил код @niemmi, чтобы количество дискретизированных задач превышало количество рабочих в пуле. Этот код приведен ниже и используется для определения количества раз, когда число 5 появляется в диапазоне от 0 до 1E8. Я выполнил этот код, используя 1, 2, 4 и 6 работников пула и для различного соотношения количества дискретизированных задач к количеству работников пула. Для каждого сценария было выполнено 3 прогона, а время вычислений было сведено в таблицу. " Ускорение " определяется здесь как среднее время вычислений с использованием равного количества чанков и работников пула в течение среднего времени вычислений, когда число дискретизированных задач больше, чем число работников пула.

Выводы:

нчк над нью работниками

  1. На рисунке слева показано время вычислений для всех сценариев, упомянутых в разделе эксперимента. Это показывает, что время вычислений, взятое числом кусков / числом работников = 1, всегда больше, чем время вычислений, взятое количеством кусков> количеством работников. То есть первый случай всегда менее эффективен, чем второй.

  2. На рисунке справа показано, что ускорение в 1,2 раза или более было достигнуто, когда число кусков / число работников достигает порогового значения 14 или более. Интересно отметить, что тенденция ускорения также произошла, когда ProcessPoolExecutor.map() был казнен с 1 работником.

Вывод: при настройке количества дискретных задач, которые ProcessPoolExecutor.map()`должен использовать для решения данной задачи, целесообразно убедиться, что это число больше, чем у работников пула номеров, поскольку такая практика сокращает время вычислений.

Код concurrent.futures.ProcessPoolExecutor.map(). (только пересмотренные части)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('\n within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

================================================== =====================

Часть 2. Общее время вычислений при использовании подклассов ProcessPoolExecutor.submit() и.map () может отличаться при возврате отсортированного / упорядоченного списка результатов.

Справочная информация: я изменил оба .submit() а также .map() коды, позволяющие сравнивать время вычислений между яблоками и яблоками и возможность визуализировать время вычислений основного кода, время вычислений метода _concurrent, вызываемого основным кодом для выполнения параллельных операций, и вычисления время для каждой дискретизированной задачи / работника, вызванного методом _concurrent. Кроме того, параллельный метод в этих кодах был структурирован так, чтобы возвращать неупорядоченный и упорядоченный список результатов непосредственно из будущего объекта .submit() и итератор .map(), Исходный код приведен ниже (надеюсь, он вам поможет).

Эксперименты Эти два недавно улучшенных кода использовались для выполнения того же эксперимента, описанного в части 1, за исключением того, что были рассмотрены только 6 рабочих пулов и встроен Python. list а также sorted методы использовались для возврата неупорядоченного и упорядоченного списка результатов в основной раздел кода, соответственно.

Выводы: submit vs.map плюс список против отсортированный

  1. Из результата метода _concurrent мы можем увидеть время вычислений метода _concurrent, использованного для создания всех объектов Future ProcessPoolExecutor.submit() и создать итератор ProcessPoolExecutor.map(), в зависимости от числа дискретизированных задач по сравнению с количеством работников пула, эквивалентны. Этот результат просто означает, что ProcessPoolExecutor подклассы .submit() а также .map() одинаково эффективны / быстры.
  2. Сравнивая время вычислений из main и его метода _concurrent, мы видим, что main работал дольше, чем его метод _concurrent. Этого следует ожидать, так как разница во времени отражает количество вычислений времени list а также sorted методы (и другие методы, включенные в эти методы). Ясно видно, что list метод вернул список результатов меньше, чем sorted метод. Среднее время вычислений list метод для обоих кодов.submit() и.map () был одинаковым, ~0,47 сек. Среднее время вычислений отсортированного метода для кодов.submit() и.map () составило 1,23 с и 1,01 с соответственно. Другими словами, list метод выполнен в 2,62 раза и в 2,15 раза быстрее, чем sorted метод для кодов.submit() и.map () соответственно.
  3. Не понятно почему sorted Метод сгенерировал упорядоченный список из .map() быстрее чем из .submit(), так как число дискретизированных задач увеличилось больше, чем число работников пула, за исключением случаев, когда число дискретизированных задач равнялось количеству работников пула. Тем не менее, эти выводы показывают, что решение использовать одинаково быстро .submit() или же .map() подклассы могут быть обременены отсортированным методом. Например, если целью является создание упорядоченного списка в кратчайшие сроки, использование ProcessPoolExecutor.map() должно быть предпочтительнее, чем ProcessPoolExecutor.submit() как .map() может позволить самое короткое общее время вычислений.
  4. Схема дискретизации, упомянутая в части 1 моего ответа, показана здесь для ускорения работы обоих .submit() а также .map() подклассы. Степень ускорения может достигать 20% по сравнению со случаем, когда количество дискретизированных задач равняется количеству работников пула.

Улучшен код.map ()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found {0} in {1:.4f}sec".format(len(found),end))    

Улучшен код.submit().
Этот код аналогичен коду.map, за исключением того, что вы заменили метод _concurrent следующим:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

================================================== =====================

Вы сравниваете яблоки с апельсинами здесь. Когда используешь map вы производите все 1E8 номера и передать их рабочим процессам. Это занимает много времени по сравнению с фактическим исполнением. Когда используешь submit вы просто создаете 6 наборов параметров, которые будут переданы.

Если вы измените map Для работы по тому же принципу вы получите числа, близкие друг к другу:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

Вы можете улучшить производительность отправки с помощью as_completed правильно. Для данного итерируемого фьючерса он вернет итератор, который будет yield фьючерсы в порядке их завершения.

Вы также можете пропустить копирование данных в другой массив и использовать itertools.chain.from_iterable объединить результаты из фьючерсов в одну итерацию:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))
Другие вопросы по тегам