Показать ход вызова карты многопроцессорного пула Python?

У меня есть сценарий, который успешно выполняет многопроцессорный набор задач пула с imap_unordered() вызов:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Тем не менее, мой num_tasks около 250000, и поэтому join() блокирует основной поток на 10 секунд или около того, и я хотел бы иметь возможность постепенно отображать командную строку, чтобы показать, что основной процесс не заблокирован. Что-то вроде:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

Есть ли метод для объекта результата или самого пула, который указывает количество оставшихся задач? Я пытался с помощью multiprocessing.Value объект как счетчик (do_work вызывает counter.value += 1 действие после выполнения своей задачи), но счетчик получает только ~85% от общего значения перед прекращением увеличения.

12 ответов

Решение

Нет необходимости обращаться к закрытым атрибутам результирующего набора:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

Мой личный фаворит - дает вам небольшой индикатор прогресса и завершения ETA, пока все работает и фиксируется параллельно.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

Как предполагает Тим, вы можете использовать tqdm а также imap решить эту проблему. Я просто наткнулся на эту проблему и подправил imap_unordered решение, так что я могу получить доступ к результатам сопоставления. Вот как это работает:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

Если вам не нужны значения, возвращаемые вашими заданиями, вам не нужно присваивать список какой-либо переменной.

Я обнаружил, что работа уже была сделана к тому времени, когда я попытался проверить ее прогресс. Это то, что работает для меня, используя tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Это должно работать со всеми разновидностями многопроцессорной обработки, независимо от того, блокируют они или нет.

Сам нашел ответ с еще одним копанием: Взглянув на __dict__ из imap_unordered объект результата, я обнаружил, что он имеет _index атрибут, который увеличивается с каждым завершением задачи. Так что это работает для регистрации, завернутый в while цикл:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

Тем не менее, я обнаружил, что обмен imap_unordered для map_async привело к гораздо более быстрому выполнению, хотя объект результата немного отличается. Вместо этого результат объекта из map_async имеет _number_left атрибут и ready() метод:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

Я знаю, что это довольно старый вопрос, но вот что я делаю, когда хочу отслеживать ход выполнения пула задач в python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

По сути, вы используете apply_async с callbak (в данном случае это добавление возвращаемого значения в список), поэтому вам не нужно ждать, чтобы сделать что-то еще. Затем, в течение цикла, вы проверяете ход выполнения работы. В этом случае я добавил виджет, чтобы он выглядел лучше.

Выход:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Надеюсь, поможет.

Для всех, кто ищет простое решение для работы с Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

Быстрый старт

С использованиемtqdmа такжеmultiprocessing.Pool

Установить

      pip install tqdm

Пример

      import time
import threading
from multiprocessing import Pool

from tqdm import tqdm


def do_work(x):
    time.sleep(x)
    return x


def progress():
    time.sleep(3)  # Check progress after 3 seconds
    print(f'total: {pbar.total} finish:{pbar.n}')


tasks = range(10)
pbar = tqdm(total=len(tasks))

if __name__ == '__main__':
    thread = threading.Thread(target=progress)
    thread.start()
    results = []
    with Pool(processes=5) as pool:
        for result in pool.imap_unordered(do_work, tasks):
            results.append(result)
            pbar.update(1)
    print(results)

Результат




Колба

Установить

      pip install flask

main.py

      import time
from multiprocessing import Pool

from tqdm import tqdm
from flask import Flask, make_response, jsonify

app = Flask(__name__)


def do_work(x):
    time.sleep(x)
    return x


total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))


@app.route('/run/')
def run():
    results = []
    with Pool(processes=2) as pool:
        for _result in pool.imap_unordered(do_work, tasks):
            results.append(_result)
            if pbar.n >= total:
                pbar.n = 0  # reset
            pbar.update(1)
    response = make_response(jsonify(dict(results=results)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response


@app.route('/progress/')
def progress():
    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

Выполнить (например, в Windows)

      set FLASK_APP=main
flask run

список API

test.html

      <!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Progress Bar</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
         style="width: 10%">0.00%
    </div>
</div>
</body>
<script>
    function set_progress_rate(n, total) {
        //Set the rate of progress bar
        var rate = (n / total * 100).toFixed(2);
        if (n > 0) {
            $(".progress-bar").attr("aria-valuenow", n);
            $(".progress-bar").attr("aria-valuemax", total);
            $(".progress-bar").text(rate + "%");
            $(".progress-bar").css("width", rate + "%");
        }
    }

    $("#run").click(function () {
        //Run the task
        $.ajax({
            url: "http://127.0.0.1:5000/run/",
            type: "GET",
            success: function (response) {
                set_progress_rate(100, 100);
                console.log('Results:' + response['results']);
            }
        });
    });
    setInterval(function () {
        //Show progress every 1 second
        $.ajax({
            url: "http://127.0.0.1:5000/progress/",
            type: "GET",
            success: function (response) {
                console.log(response);
                var n = response["n"];
                var total = response["total"];
                set_progress_rate(n, total);
            }
        });
    }, 1000);
</script>
</html>

Результат

Проведя небольшое исследование, я написал небольшой модуль под названием parallelbar. Он позволяет отображать как общий прогресс пула, так и по каждому ядру отдельно. Он прост в использовании и имеет хорошее описание.

Например:

      from parallelbar import progress_map
from parallelbar.tools import cpu_bench


if __name__=='__main__':
    # create list of task
    tasks = [1_000_000 + i for i in range(100)]
    progress_map(cpu_bench, tasks)

Я создал собственный класс для создания распечатки прогресса. Мэйби, это помогает:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

Попробуйте этот простой подход на основе очереди, который также можно использовать с пулами. Помните, что распечатка чего-либо после инициализации индикатора выполнения приведет к его перемещению, по крайней мере, для данного индикатора выполнения. (Прогресс PyPI 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

Некоторые ответы работают с индикатором выполнения, но мне не удалось получить результаты из пула

Я использовал tqdm для создания индикатора выполнения, и вы можете установить его с помощью pip install tqdm

Ниже приведен простой код, который отлично работает с индикатором выполнения, и вы также можете получить результат:

      from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

tasks = range(5)
result = []

def do_work(x):
    # do something with x and return the result
    sleep(2)
    return x + 2

if __name__ == '__main__':
    pbar = tqdm(total=len(tasks))

    with Pool(2) as p:
        for i in p.imap_unordered(do_work, tasks):

            result.append(i)
            pbar.update(i)
    
    pbar.close()
    print(result)
Другие вопросы по тегам