Показать ход вызова карты многопроцессорного пула Python?
У меня есть сценарий, который успешно выполняет многопроцессорный набор задач пула с imap_unordered()
вызов:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Тем не менее, мой num_tasks
около 250000, и поэтому join()
блокирует основной поток на 10 секунд или около того, и я хотел бы иметь возможность постепенно отображать командную строку, чтобы показать, что основной процесс не заблокирован. Что-то вроде:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Есть ли метод для объекта результата или самого пула, который указывает количество оставшихся задач? Я пытался с помощью multiprocessing.Value
объект как счетчик (do_work
вызывает counter.value += 1
действие после выполнения своей задачи), но счетчик получает только ~85% от общего значения перед прекращением увеличения.
12 ответов
Нет необходимости обращаться к закрытым атрибутам результирующего набора:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
Мой личный фаворит - дает вам небольшой индикатор прогресса и завершения ETA, пока все работает и фиксируется параллельно.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
Как предполагает Тим, вы можете использовать tqdm
а также imap
решить эту проблему. Я просто наткнулся на эту проблему и подправил imap_unordered
решение, так что я могу получить доступ к результатам сопоставления. Вот как это работает:
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
Если вам не нужны значения, возвращаемые вашими заданиями, вам не нужно присваивать список какой-либо переменной.
Я обнаружил, что работа уже была сделана к тому времени, когда я попытался проверить ее прогресс. Это то, что работает для меня, используя tqdm.
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
Это должно работать со всеми разновидностями многопроцессорной обработки, независимо от того, блокируют они или нет.
Сам нашел ответ с еще одним копанием: Взглянув на __dict__
из imap_unordered
объект результата, я обнаружил, что он имеет _index
атрибут, который увеличивается с каждым завершением задачи. Так что это работает для регистрации, завернутый в while
цикл:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
Тем не менее, я обнаружил, что обмен imap_unordered
для map_async
привело к гораздо более быстрому выполнению, хотя объект результата немного отличается. Вместо этого результат объекта из map_async
имеет _number_left
атрибут и ready()
метод:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
Я знаю, что это довольно старый вопрос, но вот что я делаю, когда хочу отслеживать ход выполнения пула задач в python.
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
По сути, вы используете apply_async с callbak (в данном случае это добавление возвращаемого значения в список), поэтому вам не нужно ждать, чтобы сделать что-то еще. Затем, в течение цикла, вы проверяете ход выполнения работы. В этом случае я добавил виджет, чтобы он выглядел лучше.
Выход:
4 of 4
['AA', 'BB', 'CC', 'DD']
Надеюсь, поможет.
Для всех, кто ищет простое решение для работы с Pool.apply_async()
:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
def work(x):
sleep(0.5)
return x**2
n = 10
p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]
Быстрый старт
С использованиемtqdm
а такжеmultiprocessing.Pool
Установить
pip install tqdm
Пример
import time
import threading
from multiprocessing import Pool
from tqdm import tqdm
def do_work(x):
time.sleep(x)
return x
def progress():
time.sleep(3) # Check progress after 3 seconds
print(f'total: {pbar.total} finish:{pbar.n}')
tasks = range(10)
pbar = tqdm(total=len(tasks))
if __name__ == '__main__':
thread = threading.Thread(target=progress)
thread.start()
results = []
with Pool(processes=5) as pool:
for result in pool.imap_unordered(do_work, tasks):
results.append(result)
pbar.update(1)
print(results)
Результат
Колба
Установить
pip install flask
main.py
import time
from multiprocessing import Pool
from tqdm import tqdm
from flask import Flask, make_response, jsonify
app = Flask(__name__)
def do_work(x):
time.sleep(x)
return x
total = 5 # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))
@app.route('/run/')
def run():
results = []
with Pool(processes=2) as pool:
for _result in pool.imap_unordered(do_work, tasks):
results.append(_result)
if pbar.n >= total:
pbar.n = 0 # reset
pbar.update(1)
response = make_response(jsonify(dict(results=results)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response
@app.route('/progress/')
def progress():
response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response
Выполнить (например, в Windows)
set FLASK_APP=main
flask run
список API
- Запустите задачу: http://127.0.0.1:5000/run/
- Показать прогресс: http://127.0.0.1:5000/progress/
test.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Progress Bar</title>
<script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
style="width: 10%">0.00%
</div>
</div>
</body>
<script>
function set_progress_rate(n, total) {
//Set the rate of progress bar
var rate = (n / total * 100).toFixed(2);
if (n > 0) {
$(".progress-bar").attr("aria-valuenow", n);
$(".progress-bar").attr("aria-valuemax", total);
$(".progress-bar").text(rate + "%");
$(".progress-bar").css("width", rate + "%");
}
}
$("#run").click(function () {
//Run the task
$.ajax({
url: "http://127.0.0.1:5000/run/",
type: "GET",
success: function (response) {
set_progress_rate(100, 100);
console.log('Results:' + response['results']);
}
});
});
setInterval(function () {
//Show progress every 1 second
$.ajax({
url: "http://127.0.0.1:5000/progress/",
type: "GET",
success: function (response) {
console.log(response);
var n = response["n"];
var total = response["total"];
set_progress_rate(n, total);
}
});
}, 1000);
</script>
</html>
Результат
Проведя небольшое исследование, я написал небольшой модуль под названием parallelbar. Он позволяет отображать как общий прогресс пула, так и по каждому ядру отдельно. Он прост в использовании и имеет хорошее описание.
Например:
from parallelbar import progress_map
from parallelbar.tools import cpu_bench
if __name__=='__main__':
# create list of task
tasks = [1_000_000 + i for i in range(100)]
progress_map(cpu_bench, tasks)
Я создал собственный класс для создания распечатки прогресса. Мэйби, это помогает:
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
Попробуйте этот простой подход на основе очереди, который также можно использовать с пулами. Помните, что распечатка чего-либо после инициализации индикатора выполнения приведет к его перемещению, по крайней мере, для данного индикатора выполнения. (Прогресс PyPI 1.5)
import time
from progress.bar import Bar
def status_bar( queue_stat, n_groups, n ):
bar = Bar('progress', max = n)
finished = 0
while finished < n_groups:
while queue_stat.empty():
time.sleep(0.01)
gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()
def process_data( queue_data, queue_stat, group):
for i in group:
... do stuff resulting in new_data
queue_stat.put(1)
queue_stat.put('finished')
queue_data.put(new_data)
def multiprocess():
new_data = []
groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])
queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()
for i, group in enumerate(groups):
if i == 0:
p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()
p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()
for i in range(len(groups)):
data = queue_data.get()
new_data += data
for p in processes:
p.join()
Некоторые ответы работают с индикатором выполнения, но мне не удалось получить результаты из пула
Я использовал tqdm для создания индикатора выполнения, и вы можете установить его с помощью
pip install tqdm
Ниже приведен простой код, который отлично работает с индикатором выполнения, и вы также можете получить результат:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
tasks = range(5)
result = []
def do_work(x):
# do something with x and return the result
sleep(2)
return x + 2
if __name__ == '__main__':
pbar = tqdm(total=len(tasks))
with Pool(2) as p:
for i in p.imap_unordered(do_work, tasks):
result.append(i)
pbar.update(i)
pbar.close()
print(result)