Почему многопроцессорная обработка python занимает больше времени, чем последовательный код? Как ускорить это?

Я опробовал многопроцессорный модуль Python. В приведенном ниже коде время последовательного выполнения составляет 0,09 секунды, а время параллельного выполнения составляет 0,2 секунды. Так как я не ускоряюсь, я думаю, что где-то не так

import multiprocessing as mp
from random import uniform, randrange
import time

# m = mp.Manager()
out_queue = mp.Queue()

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals


def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    # print cals
    out_queue.put(cals)
    # print "Exec over"


def concurrency():
    # out_queue1 = mp.Queue()
    # out_queue2 = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
    p1.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p2.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p3.start()
    out_queue.get()

    p4.start()
    out_queue.get()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

Моя система имеет четыре ядра. Пожалуйста, дайте мне знать, как я могу ускорить этот код. Кроме того, каковы мои варианты для параллельного программирования с Python(кроме многопроцессорного модуля).

Спасибо и С уважением

2 ответа

Любовь это страсть . , , но может сильно повредить, если вера просто слепа или наивна к уликам

Я люблю python за его простоту использования, за его универсальность, но для повышения производительности HPC требуются дополнительные знания, связанные с аппаратным обеспечением, а также усилия по оптимизации.

@RupjitChakraborty, как вы могли бы насладиться в моем ответе ниже, тот же результат может быть получен в чистом виде [SERIAL] -кодировать примерно в 50 раз быстрее, чем в вашем лучшем случае, и примерно в 100 раз быстрее, чем заявленное Марком время. Не стесняйтесь повторно протестировать его на своем оборудовании, чтобы иметь ту же платформу для более точного сравнения показателей производительности. Тем не менее, наслаждайтесь охотой на производительность! - user3666197 1 декабря '17 в 13:39

Если я могу положить несколько центов в эту бесконечную охоту за производительностью:
- постарайтесь хорошо понять как исходный закон Амдала, так и его новую строгую переформулировку.
- попытайтесь точно определить количество накладных расходов, которые появляются при управлении процессами
- постарайтесь точно определить стоимость дополнительных накладных расходов, связанных с большими объемами передачи данных (стоимость одного окна)
- Старайтесь избегать любой потенциальной блокировки (б), некоторые могут быть скрыты "позади" используемых конструкторов.
- старайтесь избегать любых не связанных с обработкой накладных расходов на синхронизацию + связь
- попытаться предотвратить любые ошибки кэша CPU_core, а также лучше минимизировать потери когерентности (да, легко сказать, трудно кодировать - т. е. созданный вручную код часто становится лучше, чем простой однострочный, с использованием какого-то высокоинтегрированного синтаксического конструктора (но ценой, которой нельзя управлять), так как вы можете предпринять более эффективные шаги в принятии решений, связанных с кэшем, под вашим контролем, чем полагаться на то, что это делается с помощью некоторого неосознанного заранее подготовленного универсального (то есть не связанного с вашими конкретными приоритетами) преобразования кода)


Хотите ускорения?
Всегда систематически проверяйте отдельные факторы в отдельности:

В качестве краткого обзора фактических затрат ваш код будет платить[us] никогда не угадай, проверь это.

Тест-кейс A: измерение процесса управления [SERIAL] -процесс-планирование дополнительных расходов
Контрольный пример B: измеряет дополнительные затраты на выделение памяти для удаленного процесса
Контрольный пример C: измеряет удаленный процесс [CONCURRENT] -процесс-планирование вычислительных затрат Тест-кейс D: измеряет влияние удаленных рабочих процессов на [CONCURRENT] планирование расходов

Для деталей,
можно читать дальше и повторно использовать / улучшать шаблоны наивного кода
в главе [ Факты архитектуры, ресурсов и планирования процессов, которые имеют значение ].

Как уже предупреждал Марк, еще одна издержка для расчета ускорения по закону Амдала будет зависеть от передачи данных от основного процесса к каждому из порожденных подпроцессов, где [SERIAL] накладные расходы надстройки будут расти и увеличиваться больше, чем линейно масштабируемые до объема данных, из-за конфликтующих шаблонов доступа, конкуренции за физическую емкость ресурса, сигнализации сигнализации общих объектов- (b) накладных расходов и аналогичных аппаратных препятствий, которых нельзя избежать.

Прежде чем углубляться в варианты настройки производительности, можно предложить простой тестовый пример E: для измерения этого самого класса дополнительных затрат на передачу данных и памяти:

def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             but to be able to benchmark
                             add-on overhead costs
                             raised by a need to transfer
                             some large amount of data
                             from a main()-process
                             to this FUN()-subprocess spawned.
    """
    return ( anIndeedFatPieceOfDATA[ 0]
           + anIndeedFatPieceOfDATA[-1]
             )

##############################################################
###  A NAIVE TEST BENCH
##############################################################
from zmq import Stopwatch; aClk = Stopwatch()
JOBS_TO_SPAWN =  4         # TUNE:  1,  2,  4,   5,  10, ..
RUNS_TO_RUN   = 10         # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..
SIZE_TO_XFER  = 1E+6       # TUNE: +6, +7, +8,  +9, +10, ..

DATA_TO_XFER  = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]

try:
     aClk.start()
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
     joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                      )( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )
                                       ( a_FAT_DATA )
                                   for ( a_FAT_DATA )
                                   in  [       DATA_TO_XFER
                                         for _ in range( RUNS_TO_RUN )
                                         ]
                         )
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
except:
     pass
finally:
     try:
         _ = aClk.stop()
     except:
         _ = -1
         pass

template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"

print( template.format( _,
                        JOBS_TO_SPAWN,
                        RUNS_TO_RUN,
                        SIZE_TO_SEND / 1024. /1024.
                        )
       )

Пожалуйста, дайте мне знать, как я могу ускорить этот код.

  • узнать о numba безусловно, стоит знать этот инструмент для повышения производительности
  • узнать о векторизации операций
  • после освоения этих двух, возможно, стоит пересмотреть уже совершенный код в Cython

rVEC = np.random.uniform( 1, 4, 1E+6 )

def flop_NaivePY( r, a, b ):
    return(       r+(a *b ) )

aClk.start(); _ = flop_NaivePY( rVEC, a, b ); aClk.stop()
4868L
4253L
4113L
4376L
4333L
4137L
4.~_____[ms] @ 1.000.000 FLOAT-OPS, COOL, RIGHT?

Тем не менее, этот код ужасно неправильн, если думать о производительности.

Давай включим numpy назначения на месте, избегая дублирования выделения памяти и подобных неэффективных обработок:

def flop_InplaceNUMPY( r, a, b ):
       r += a * b
       return r

aClk.start(); _ = flop_InplaceNUMPY( rVEC, a, b ); aClk.stop()
2459L
2426L
2658L
2444L
2421L
2430L
2429L
4.??         @ 1.000.000 FLOAT-OPS, COOL, RIGHT? NOT AS SEEN NOW
2.~!____[ms] @ 1.000.000 FLOAT-OPS, HALF, BETTER!
                                          BUT
                                          ALSO TEST THE SCALING
                                          ONCE GONE OFF CACHE,
                                          THAT TEST GET SMELL OF A NEED
                                                              TO OPTIMISE
                                                              CODE DESIGN

Осторожные экспериментаторы скоро покажут, что позже можно будет увидеть даже убитый процесс Python во время выполнения наивного кода, так как недостаточный запрос на выделение памяти будет задушен и запаникован для завершения при больших размерах выше ~1E+9)

это все принесет иначе [SERIAL] код на стероидах, но без каких-либо дополнительных затрат, кроме нуля, и дядя Джин Амдаль вознаградит вас за знания по планированию процессов и архитектуре аппаратного обеспечения, а также за усилия, потраченные при разработке кода на макс.

Нет лучшего совета не существует . , , за исключением того, чтобы заниматься чистым бизнесом ясновидения, где повторное тестирование никогда не доступно

out_queue.get() блокирует, пока результат не будет доступен по умолчанию. Таким образом, вы, по сути, запускаете процесс и ждете его завершения, прежде чем начинать следующий процесс. Вместо этого запустите все процессы, затем получите все результаты.

Пример:

    #!python2
    import multiprocessing as mp
    from random import uniform, randrange
    import time

    def flop_no(rand_nos, a, b):
        cals = []
        for r in rand_nos:
            cals.append(r + a * b)
        return cals

    def flop(val, a, b, out_queue):
        cals = []
        for v in val:
            cals.append(v + a * b)
        out_queue.put(cals)
        # time.sleep(3)

    def concurrency():
        out_queue = mp.Queue()
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        print len(rand_nos)
        # for i in range(5):
        start_time = time.time()
        p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
        p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
        p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
        p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))

        p1.start()
        p2.start()
        p3.start()
        p4.start()

        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())

        p1.join()
        p2.join()
        p3.join()
        p4.join()

        print "Running time parallel: ", time.time() - start_time, "secs"

    def no_concurrency():
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        start_time = time.time()
        cals = flop_no(rand_nos, a, b)
        print "Running time serial: ", time.time() - start_time, "secs"

    if __name__ == '__main__':
        concurrency()
        no_concurrency()
        # print "Program over" 

Output:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  3.54999995232  secs
    Running time serial:    0.203000068665 secs

Note that parallel time is still slower.  This is due to the overhead of starting 4 other Python processes.  Your processing time for the whole job is only .2 seconds.  The 3.5 seconds for parallel is mostly just starting up the processes.  Note the commented out `# time.sleep(3)` above in `flop()`.  Add that code in and the times are:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  6.50900006294  secs
    Running time serial:    0.203000068665 secs

The overall time only got 3 seconds faster (not 12) because they were running in parallel.  You need a lot more data to make parallel processing worthwhile.

Here's a version where you can visually see how long it takes to start the processes.  "here" is printed as each process begins to run `flop()`.  An event is used to start all threads at the same time, and only the processing time is counted:

#!python2

import multiprocessing as mp
from random import uniform, randrange
import time

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals

def flop(val, a, b, out_queue, start):
    print 'here'
    start.wait()
    cals = []
    for v in val:
        cals.append(v + a * b)
    out_queue.put(cals)
    time.sleep(3)

def concurrency():
    out_queue = mp.Queue()
    start = mp.Event()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue, start))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue, start))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue, start))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue, start))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    time.sleep(5) # Wait for processes to start.  See Barrier in Python 3.2+ for a better solution.
    print "go"
    start.set()
    start_time = time.time()
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print "Running time parallel: ", time.time() - start_time, "secs"

    p1.join()
    p2.join()
    p3.join()
    p4.join()

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

Выход:

1000000
here           # note these print about a second apart.
here
here
here
go
250000
250000
250000
250000
Running time parallel:  0.171999931335 secs
Running time serial:    0.203000068665 secs

Теперь время обработки стало быстрее. Не очень... вероятно, из-за межпроцессного взаимодействия, чтобы получить результаты.

Другие вопросы по тегам