python concurrent.futures.ProcessPoolExecutor: Производительность.submit() против.map()
Я использую concurrent.futures.ProcessPoolExecutor, чтобы найти вхождение числа из диапазона номеров. Цель состоит в том, чтобы исследовать количество ускоренной производительности, полученной от параллелизма. Для оценки производительности у меня есть элемент управления - последовательный код для выполнения указанной задачи (показано ниже). Я написал 2 одновременных кода, один из которых использует concurrent.futures.ProcessPoolExecutor.submit()
а другой использует concurrent.futures.ProcessPoolExecutor.map()
выполнить ту же задачу. Они показаны ниже. Советы по составлению первого и второго можно увидеть здесь и здесь, соответственно.
Задача, выданная всем трем кодам, заключалась в том, чтобы найти число вхождений числа 5 в диапазоне чисел от 0 до 1E8. И то и другое .submit()
а также .map()
были назначены 6 работников, и .map()
имел размер 10000. Способ одновременной дискретизации рабочей нагрузки был идентичен в параллельных кодах. Однако функция, используемая для поиска вхождений в обоих кодах, была разной. Это произошло потому, что способы передачи аргументов в функции, вызываемые.submit() и.map(), были разными.
Все 3 кода сообщили об одном и том же количестве случаев, т.е. 56 953 279 раз. Однако время, затраченное на выполнение задачи, было совсем другим. .submit()
выполняется в 2 раза быстрее, чем контроль .map()
потребовалось вдвое больше времени, чтобы контроль завершил свою задачу.
Вопросы:
- Я хотел бы знать, если медленная производительность
.map()
Является ли артефакт моего кодирования или он по сути медленный?"Если первый, как я могу улучшить его. Я просто удивлен, что он работал медленнее, чем контроль, так как не будет особого стимула для его использования. - Я хотел бы знать, если есть в любом случае, чтобы сделать
.submit()
код выполняется еще быстрее У меня есть условие, что функция_concurrent_submit()
должен вернуть итерируемое число / число, содержащее число 5.
concurrent.futures.ProcessPoolExecutor.submit()
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from traceback import print_exc
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('\n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
def _concurrent_submit(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
# 2.2. Instruct workers to process results as they come, when all are
# completed or .....
cf.as_completed(futures) # faster than cf.wait()
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future.result():
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent_submit():')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_submit(nmax, number, workers)
end = time() - start
print('\n main')
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
concurrent.futures.ProcessPoolExecutor.map()
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
def _findmatch(listnumber, number):
'''Function to find the occurrence of number in another number and return
a string value.'''
#print('def _findmatch(listnumber, number):')
#print('listnumber = {0} and ref = {1}'.format(listnumber, number))
if number in str(listnumber):
x = listnumber
#print('x = {0}'.format(x))
return x
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
numberlist = range(cstart, cstop)
futures.append(executor.map(_findmatch, numberlist,
itertools.repeat(number),
chunksize=10000))
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_map(nmax, number, workers)
end = time() - start
print('\n main')
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
Серийный номер:
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
from time import time
def _serial(nmax, number):
start = time()
match=[]
nlist = range(nmax)
for n in nlist:
if number in str(n):match.append(n)
end=time()-start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
start = time()
a = _serial(nmax, number)
end = time() - start
print('\n main')
print("found {0} in {1:.4f}sec".format(len(a),end))
Обновление 13 февраля 2017 г.:
В дополнение к ответу @niemmi, я предоставил ответ после некоторых личных исследований, чтобы показать:
- как ускорить работу решений @map () и.submit() @ niemmi, и
- когда ProcessPoolExecutor.map() может привести к большей скорости, чем ProcessPoolExecutor.submit().
2 ответа
Обзор:
На мой ответ есть 2 части:
- Часть 1 показывает, как добиться большего ускорения от @niemmi
ProcessPoolExecutor.map()
решение. - Часть 2 показывает, когда
ProcessPoolExecutor
подклассы.submit()
а также.map()
дать неэквивалентное время вычислений.
================================================== =====================
Часть 1: Больше ускорения для ProcessPoolExecutor.map()
Фон: этот раздел основан на @niemmi's .map()
решение, которое само по себе отлично. Проводя некоторые исследования его схемы дискретизации, чтобы лучше понять, как это взаимодействует с аргументом.map (), я нашел это интересное решение.
Я рассматриваю определение @niemmi chunk = nmax // workers
быть определением размера фрагмента, то есть меньшего размера фактического диапазона номеров (заданной задачи), который должен обрабатываться каждым рабочим в пуле рабочих. Теперь это определение основано на предположении, что если на компьютере имеется x количество работников, распределение задачи поровну между каждым работником приведет к оптимальному использованию каждого работника и, следовательно, общая задача будет выполнена быстрее всего. Следовательно, количество фрагментов, на которые нужно разбить задание, должно всегда равняться количеству работников пула. Однако верно ли это предположение?
Предложение: здесь я предлагаю, чтобы вышеупомянутое предположение не всегда приводило к быстрейшему времени вычисления при использовании с ProcessPoolExecutor.map()
, Скорее, дискретизация задачи до суммы, превышающей количество работников пула, может привести к ускорению, то есть более быстрому завершению данной задачи.
Эксперимент: я изменил код @niemmi, чтобы количество дискретизированных задач превышало количество рабочих в пуле. Этот код приведен ниже и используется для определения количества раз, когда число 5 появляется в диапазоне от 0 до 1E8. Я выполнил этот код, используя 1, 2, 4 и 6 работников пула и для различного соотношения количества дискретизированных задач к количеству работников пула. Для каждого сценария было выполнено 3 прогона, а время вычислений было сведено в таблицу. " Ускорение " определяется здесь как среднее время вычислений с использованием равного количества чанков и работников пула в течение среднего времени вычислений, когда число дискретизированных задач больше, чем число работников пула.
Выводы:
На рисунке слева показано время вычислений для всех сценариев, упомянутых в разделе эксперимента. Это показывает, что время вычислений, взятое числом кусков / числом работников = 1, всегда больше, чем время вычислений, взятое количеством кусков> количеством работников. То есть первый случай всегда менее эффективен, чем второй.
На рисунке справа показано, что ускорение в 1,2 раза или более было достигнуто, когда число кусков / число работников достигает порогового значения 14 или более. Интересно отметить, что тенденция ускорения также произошла, когда
ProcessPoolExecutor.map()
был казнен с 1 работником.
Вывод: при настройке количества дискретных задач, которые ProcessPoolExecutor.map()`должен использовать для решения данной задачи, целесообразно убедиться, что это число больше, чем у работников пула номеров, поскольку такая практика сокращает время вычислений.
Код concurrent.futures.ProcessPoolExecutor.map(). (только пересмотренные части)
def _concurrent_map(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop,
itertools.repeat(number))
# 2.2. Consolidate result as a list and return this list.
for future in futures:
#print('type(future)=',type(future))
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('\n within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 4 # Pool of workers
chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
a = _concurrent_map(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
================================================== =====================
Часть 2. Общее время вычислений при использовании подклассов ProcessPoolExecutor.submit() и.map () может отличаться при возврате отсортированного / упорядоченного списка результатов.
Справочная информация: я изменил оба .submit()
а также .map()
коды, позволяющие сравнивать время вычислений между яблоками и яблоками и возможность визуализировать время вычислений основного кода, время вычислений метода _concurrent, вызываемого основным кодом для выполнения параллельных операций, и вычисления время для каждой дискретизированной задачи / работника, вызванного методом _concurrent. Кроме того, параллельный метод в этих кодах был структурирован так, чтобы возвращать неупорядоченный и упорядоченный список результатов непосредственно из будущего объекта .submit()
и итератор .map()
, Исходный код приведен ниже (надеюсь, он вам поможет).
Эксперименты Эти два недавно улучшенных кода использовались для выполнения того же эксперимента, описанного в части 1, за исключением того, что были рассмотрены только 6 рабочих пулов и встроен Python. list
а также sorted
методы использовались для возврата неупорядоченного и упорядоченного списка результатов в основной раздел кода, соответственно.
- Из результата метода _concurrent мы можем увидеть время вычислений метода _concurrent, использованного для создания всех объектов Future
ProcessPoolExecutor.submit()
и создать итераторProcessPoolExecutor.map()
, в зависимости от числа дискретизированных задач по сравнению с количеством работников пула, эквивалентны. Этот результат просто означает, чтоProcessPoolExecutor
подклассы.submit()
а также.map()
одинаково эффективны / быстры. - Сравнивая время вычислений из main и его метода _concurrent, мы видим, что main работал дольше, чем его метод _concurrent. Этого следует ожидать, так как разница во времени отражает количество вычислений времени
list
а такжеsorted
методы (и другие методы, включенные в эти методы). Ясно видно, чтоlist
метод вернул список результатов меньше, чемsorted
метод. Среднее время вычисленийlist
метод для обоих кодов.submit() и.map () был одинаковым, ~0,47 сек. Среднее время вычислений отсортированного метода для кодов.submit() и.map () составило 1,23 с и 1,01 с соответственно. Другими словами,list
метод выполнен в 2,62 раза и в 2,15 раза быстрее, чемsorted
метод для кодов.submit() и.map () соответственно. - Не понятно почему
sorted
Метод сгенерировал упорядоченный список из.map()
быстрее чем из.submit()
, так как число дискретизированных задач увеличилось больше, чем число работников пула, за исключением случаев, когда число дискретизированных задач равнялось количеству работников пула. Тем не менее, эти выводы показывают, что решение использовать одинаково быстро.submit()
или же.map()
подклассы могут быть обременены отсортированным методом. Например, если целью является создание упорядоченного списка в кратчайшие сроки, использование ProcessPoolExecutor.map() должно быть предпочтительнее, чемProcessPoolExecutor.submit()
как.map()
может позволить самое короткое общее время вычислений. - Схема дискретизации, упомянутая в части 1 моего ответа, показана здесь для ускорения работы обоих
.submit()
а также.map()
подклассы. Степень ускорения может достигать 20% по сравнению со случаем, когда количество дискретизированных задач равняется количеству работников пула.
Улучшен код.map ()
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from itertools import repeat, chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurence of number in range nmin to nmax and return
the found occurences in a list.'''
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
#print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
# format(nmin, nmax, number, len(match),end))
return match
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop, repeat(number))
end = time() - start
print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(futures)) #Return an unordered result list
#return sorted(chain.from_iterable(futures)) #Return an ordered result list
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
found = _concurrent(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
#print('found = ', found)
print("found {0} in {1:.4f}sec".format(len(found),end))
Улучшен код.submit().
Этот код аналогичен коду.map, за исключением того, что вы заменили метод _concurrent следующим:
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(num_of_chunks):
cstart = chunksize * i
cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
end = time() - start
print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(f.result() for f in cf.as_completed(
futures))) #Return an unordered list
#return list(chain.from_iterable(f.result() for f in cf.as_completed(
# futures))) #Return an ordered list
================================================== =====================
Вы сравниваете яблоки с апельсинами здесь. Когда используешь map
вы производите все 1E8
номера и передать их рабочим процессам. Это занимает много времени по сравнению с фактическим исполнением. Когда используешь submit
вы просто создаете 6 наборов параметров, которые будут переданы.
Если вы измените map
Для работы по тому же принципу вы получите числа, близкие друг к другу:
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('\n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunk * i for i in range(workers))
cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
Вы можете улучшить производительность отправки с помощью as_completed
правильно. Для данного итерируемого фьючерса он вернет итератор, который будет yield
фьючерсы в порядке их завершения.
Вы также можете пропустить копирование данных в другой массив и использовать itertools.chain.from_iterable
объединить результаты из фьючерсов в одну итерацию:
import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('\n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
return chain.from_iterable(f.result() for f in cf.as_completed(futures))
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_map(nmax, number, workers)
end = time() - start
print('\n main')
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))