Вопрос concurrent.futures: почему только 1 работник?

Я экспериментирую с использованием concurrent.futures.ProcessPoolExecutor распараллелить последовательную задачу. Последовательная задача включает в себя поиск числа вхождений данного числа из диапазона номеров. Мой код показан ниже.
Во время его выполнения я заметил из диспетчера задач / системного монитора / top, что постоянно работает только один процессор / поток, несмотря на то, что max_workers processPoolExecutor значение больше 1. Почему это так? Как я могу распараллелить мой код, используя concurrent.futures? Мой код был выполнен с помощью Python 3.5.

import concurrent.futures as cf
from time import time

def _findmatch(nmax, number):    
    print('def _findmatch(nmax, number):')
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end = time() - start
    print("found {} in {}sec".format(len(match),end))
    return match

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        future = executor.submit(_findmatch, nmax, number)
        futures = future.result()
        found = len(futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(found, end))
    return futures

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(len(a),end))

2 ответа

Решение

Проблема с вашим кодом состоит в том, что он отправляет только одну задачу, которая затем будет выполнена одним из работников, в то время как остальные ничего не делают. Вам необходимо отправить несколько задач, которые могут выполняться работниками параллельно.

Приведенный ниже пример разбивает область поиска на три разные задачи, каждая из которых выполняется разными работниками. Фьючерсы возвращены submit добавляются в список и после того, как все они представлены wait используется, чтобы дождаться их завершения. Если вы позвоните result сразу после отправки задания оно будет заблокировано, пока будущее не будет завершено.

Обратите внимание, что вместо генерации списка чисел код ниже просто подсчитывает числа с цифрой 5, чтобы уменьшить использование памяти:

import concurrent.futures as cf
from time import time

def _findmatch(nmin, nmax, number):
    print('def _findmatch', nmin, nmax, number)
    start = time()
    count = 0
    for n in range(nmin, nmax):
        if number in str(n):
            count += 1
    end = time() - start
    print("found {} in {}sec".format(count,end))
    return count

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        chunk = nmax // workers
        futures = []

        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax

            futures.append(executor.submit(_findmatch, cstart, cstop, number))

        cf.wait(futures)
        res = sum(f.result() for f in futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(res, end))
    return res

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(a,end))

Выход:

def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec

Запуск вашего кода показывает, что все трое рабочих там, но двое из них спят. Проблема в том, что executor.submit(_findmatch, nmax, number) только один работник говорит, чтобы выполнить функцию _findmatch,

Я не понимаю, что делает ваш код, но в основном вам нужно либо

  • разделите задачу на три четных части и отправьте каждую часть в процесс, используя executor.submit
  • разбить задачу на более мелкие части (скажем, часть, состоящую из 100 элементов) и использовать map так что каждый _findmatch получает только тот кусок, которому он назначен.
Другие вопросы по тегам