Многопроцессорность: используйте tqdm для отображения индикатора выполнения

Чтобы сделать мой код более "питоническим" и более быстрым, я использую "многопроцессорность" и функцию карты для отправки ему а) функции и б) диапазона итераций.

Имплантированное решение (т. Е. Вызов tqdm непосредственно в диапазоне tqdm.tqdm(диапазон (0, 30)) не работает с многопроцессорной обработкой (как сформулировано в приведенном ниже коде).

Индикатор выполнения отображается от 0 до 100% (когда python читает код?), Но он не указывает на фактический прогресс функции карты.

Как отобразить индикатор выполнения, который указывает, на каком этапе находится функция "карта"?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Любая помощь или предложения приветствуются...

11 ответов

Решение

Найденное решение: будьте осторожны! Из-за многопроцессорности время оценки (итерация за цикл, общее время и т. Д. Может быть нестабильным), но индикатор выполнения работает отлично.

Примечание. Диспетчер контекста для пула доступен только в Python версии 3.3.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in tqdm(enumerate(p.imap_unordered(_foo, range(0, max_)))):
                pbar.update()

Используйте imap вместо map, который возвращает итератор обработанных значений.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

Извините за опоздание, но если все, что вам нужно, это параллельная карта, последняя версия (tqdm>=4.42.0) теперь имеет встроенный:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Ссылки: https://tqdm.github.io/docs/contrib.concurrent/ и https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

Ты можешь использовать p_tqdm вместо.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

Основываясь на ответе Хави Мартинеса, я написал функцию imap_unordered_bar, Может использоваться так же, как imap_unordered с той лишь разницей, что отображается полоса обработки.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))

Для индикатора выполнения с apply_async мы можем использовать следующий код, как предложено в:

https://github.com/tqdm/tqdm/issues/484

      import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def myfunc(a):
    time.sleep(random.random())
    return a ** 2

pool = Pool(2)
pbar = tqdm(total=100)

def update(*a):
    pbar.update()

for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()

Вот мой вариант, когда вам нужно получить результаты от ваших функций параллельного выполнения. Эта функция делает несколько вещей (есть еще один мой пост, который объясняет это дополнительно), но ключевым моментом является то, что есть очередь ожидающих задач и очередь завершенных задач. По мере того, как рабочие завершают выполнение каждой задачи в очереди ожидания, они добавляют результаты в очередь завершенных задач. Вы можете перенести проверку в очередь выполненных задач с помощью индикатора выполнения tqdm. Я не помещаю здесь реализацию функции do_work(), это не актуально, так как сообщение здесь предназначено для отслеживания очереди выполненных задач и обновления индикатора выполнения каждый раз, когда появляется результат.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

На основе ответа «user17242583» я создал следующую функцию. Это должно быть так же быстро, как Pool.map, и результаты всегда упорядочены. Кроме того, вы можете передать в функцию столько параметров, сколько хотите, а не только один итерируемый объект.

      from multiprocessing import Pool
from functools import partial
from tqdm import tqdm


def imap_tqdm(function, iterable, processes, chunksize=1, desc=None, disable=False, **kwargs):
    """
    Run a function in parallel with a tqdm progress bar and an arbitrary number of arguments.
    Results are always ordered and the performance should be the same as of Pool.map.
    :param function: The function that should be parallelized.
    :param iterable: The iterable passed to the function.
    :param processes: The number of processes used for the parallelization.
    :param chunksize: The iterable is based on the chunk size chopped into chunks and submitted to the process pool as separate tasks.
    :param desc: The description displayed by tqdm in the progress bar.
    :param disable: Disables the tqdm progress bar.
    :param kwargs: Any additional arguments that should be passed to the function.
    """
    if kwargs:
        function_wrapper = partial(_wrapper, function=function, **kwargs)
    else:
        function_wrapper = partial(_wrapper, function=function)

    results = [None] * len(iterable)
    with Pool(processes=processes) as p:
        with tqdm(desc=desc, total=len(iterable), disable=disable) as pbar:
            for i, result in p.imap_unordered(function_wrapper, enumerate(iterable), chunksize=chunksize):
                results[i] = result
                pbar.update()
    return results


def _wrapper(enum_iterable, function, **kwargs):
    i = enum_iterable[0]
    result = function(enum_iterable[1], **kwargs)
    return i, result

tqdmвыпустила свои собственные простые и элегантные API для одновременной работы.

Я привожу следующий фрагмент в качестве простого примера, иллюстрирующего многопоточность.

      from tqdm.contrib.concurrent import thread_map

def f(row):
    x, y = row
    time.sleep(1)  # to visualize the progress

thread_map(f, [(x, y) for x, y in zip(range(1000), range(1000))])

Этот подход прост и работает.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Другие вопросы по тегам