Как сделать параллельное программирование в Python
Для C++ мы можем использовать OpenMP для параллельного программирования; однако OpenMP не будет работать для Python. Что я должен делать, если я хочу распараллелить некоторые части моей программы на Python?
Структура кода может рассматриваться как:
solve1(A)
solve2(B)
куда solve1
а также solve2
две независимые функции. Как запустить такой код параллельно, а не последовательно, чтобы сократить время выполнения? Надеюсь, кто-нибудь может мне помочь. Большое спасибо заранее. Код является:
def solve(Q, G, n):
i = 0
tol = 10 ** -4
while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)
if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
node1 = partition[0]
node2 = partition[1]
G = updateGraph(G, node1, node2)
if i == 999:
print "Maximum iteration reaches"
print inneropt
Где setinner и setouter - две независимые функции. Вот где я хочу провести параллель...
11 ответов
Вы можете использовать многопроцессорный модуль. Для этого случая я мог бы использовать пул обработки:
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
Это вызовет процессы, которые могут сделать для вас общую работу. Так как мы не прошли processes
, он будет порождать один процесс для каждого ядра процессора на вашей машине. Каждое ядро процессора может выполнять один процесс одновременно.
Если вы хотите отобразить список в одну функцию, вы должны сделать это:
args = [A, B]
results = pool.map(solve1, args)
Не используйте потоки, потому что GIL блокирует любые операции над объектами Python.
С Рэем это можно сделать очень элегантно.
Чтобы распараллелить ваш пример, вам нужно определить свои функции с помощью @ray.remote
декоратор, а затем вызвать их с .remote
,
import ray
ray.init()
# Define the functions.
@ray.remote
def solve1(a):
return 1
@ray.remote
def solve2(b):
return 2
# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)
# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])
Это имеет ряд преимуществ перед многопроцессорным модулем.
- Тот же код будет работать на многоядерном компьютере, а также на кластере компьютеров.
- Процессы эффективно обмениваются данными через разделяемую память и сериализацию без копирования.
- Сообщения об ошибках распространяются хорошо.
Эти вызовы функций могут быть составлены вместе, например,
@ray.remote def f(x): return x + 1 x_id = f.remote(1) y_id = f.remote(x_id) z_id = f.remote(y_id) ray.get(z_id) # returns 4
- В дополнение к удаленному вызову функций классы могут быть созданы удаленно как акторы.
Обратите внимание, что Ray - это фреймворк, который я помогал разрабатывать.
Решение, как уже говорили другие, заключается в использовании нескольких процессов. Однако то, какая структура является более подходящей, зависит от многих факторов. В дополнение к уже упомянутым, есть также charm4py и mpi4py (я разработчик charm4py).
Существует более эффективный способ реализации приведенного выше примера, чем использование абстракции рабочего пула. Основной цикл отправляет те же параметры (включая полный график G
) снова и снова рабочим в каждой из 1000 итераций. Поскольку по крайней мере один работник будет находиться в другом процессе, это включает копирование и отправку аргументов в другой процесс (ы). Это может быть очень дорого в зависимости от размера объектов. Вместо этого имеет смысл, чтобы работники хранили состояние и просто отправляли обновленную информацию.
Например, в charm4py это можно сделать так:
class Worker(Chare):
def __init__(self, Q, G, n):
self.G = G
...
def setinner(self, node1, node2):
self.updateGraph(node1, node2)
...
def solve(Q, G, n):
# create 2 workers, each on a different process, passing the initial state
worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
while i < 1000:
result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A
result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B
inneropt, partition, x = result_a.get() # wait for result from worker A
outeropt = result_b.get() # wait for result from worker B
...
Обратите внимание, что для этого примера нам действительно нужен только один работник. Основной цикл может выполнять одну из функций, а рабочий может выполнять другую. Но мой код помогает проиллюстрировать несколько вещей:
- Рабочий A работает в процессе 0 (так же, как основной цикл). В то время как
result_a.get()
заблокировано ожидание результата, работник А выполняет вычисления в том же процессе. - Аргументы автоматически передаются по ссылке на работника А, поскольку он находится в том же процессе (копирование не выполняется).
CPython использует Global Interpreter Lock, которая делает параллельное программирование немного интереснее, чем C++
В этой теме есть несколько полезных примеров и описаний задачи:
Вы можете использовать joblib
библиотека для параллельных вычислений и многопроцессорной обработки.
from joblib import Parallel, delayed
Вы можете просто создать функцию foo
который вы хотите запускать параллельно и на основе следующего фрагмента кода, реализующего параллельную обработку:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
где num_cores
можно получить из multiprocessing
библиотека, как указано ниже:
import multiprocessing
num_cores = multiprocessing.cpu_count()
Если у вас есть функция с более чем одним входным аргументом, и вы просто хотите перебрать один из аргументов по списку, вы можете использовать partial
функция от functools
библиотека, как показано ниже:
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
'''
body of the function
'''
return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
Вы можете найти полное объяснение многопроцессорной обработки Python и R с парой примеров здесь.
Я всегда использую «многопроцессорную» родную библиотеку для обработки параллелизма в Python. Чтобы контролировать количество процессов в очереди, я использую общую переменную в качестве счетчика. В следующем примере вы можете увидеть, как работает параллельное выполнение простых процессов.
Я сделал обновление скрипта, чтобы упростить его использование. По сути, единственное, что вам нужно сделать, это переопределить
process
метод с функцией, которую вы хотите запустить параллельно. Смотрите пример, процедура очень проста. Кроме того, вы также можете удалить все вхождения журнала выполнения.
Когда у меня будет время, я обновлю код для работы с процессами, которые возвращают значения.
Требования
user@host:~$ pip install coloredlogs==15.0.1
Код
Сценарий параллельной обработки (копирование и вставка) :
#!/usr/bin/env python
# encoding: utf-8
from multiprocessing import Manager, Pool, Value, cpu_count
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os
LOG_LEVEL = "DEBUG"
def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
# Setting-up the script logging:
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=level
)
logger = logging.getLogger(name)
coloredlogs.install(level=level, logger=logger, isatty=True)
return logger
class ParallelProcessing:
"""
Parallel processing.
References
----------
[1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419
Examples
--------
>>> class MyParallelProcessing(ParallelProcessing):
>>> def process(self, name: str) -> None:
>>> logger = get_logger()
>>> logger.info(f"Executing process: {name}...")
>>> time.sleep(5)
>>>
>>>
>>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
>>> mpp = MyParallelProcessing()
>>> mpp.run(args_list=params_list)
"""
_n_jobs: int
_waiting_time: int
_queue: Value
_logger: Logger
def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
"""
Instantiates a parallel processing object to execute processes in parallel.
Parameters
----------
n_jobs: int
Number of jobs.
waiting_time: int
Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
"""
self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
self._logger = get_logger()
def process(self, *args) -> None:
"""
Abstract process that must be overridden.
Parameters
----------
*args
Parameters of the process to be executed.
"""
raise NotImplementedError("Process not defined ('NotImplementedError' exception).")
def _execute(self, *args) -> None:
"""
Run the process and remove it from the process queue by decreasing the queue process counter.
Parameters
----------
*args
Parameters of the process to be executed.
"""
self.process(*args)
self._queue.value -= 1
def _error_callback(self, result: Any) -> None:
"""
Error callback.
Parameters
----------
result: Any
Result from exceptions.
"""
self._logger.error(result)
os._exit(1)
def run(self, args_list: Iterator[tuple]) -> None:
"""
Run processes in parallel.
Parameters
----------
args_list: Iterator[tuple]
List of process parameters (`*args`).
"""
manager = Manager()
self._queue = manager.Value('i', 0)
lock = manager.Lock()
pool = Pool(processes=self._n_jobs)
start_time = datetime.now()
with lock: # Write-protecting the processes queue shared variable.
for args in args_list:
while True:
if self._queue.value < self._n_jobs:
self._queue.value += 1
# Running processes in parallel:
pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)
break
else:
self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
time.sleep(self._waiting_time)
pool.close()
pool.join()
exec_time = datetime.now() - start_time
self._logger.info(f"Execution time: {exec_time}")
Пример использования:
class MyParallelProcessing(ParallelProcessing):
def process(self, name: str) -> None:
"""
Process to run in parallel (overrides abstract method).
"""
logger = get_logger()
logger.info(f"Executing process: {name}...")
time.sleep(5)
def main() -> None:
n_jobs = int(sys.argv[1]) # Number of jobs to run in parallel.
params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
mpp = MyParallelProcessing(n_jobs=n_jobs)
# Executing processes in parallel:
mpp.run(args_list=params_list)
if __name__ == '__main__':
main()
Выполнение и вывод
user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934
В некоторых случаях можно автоматически распараллеливать циклы с помощью Numba, хотя это работает только с небольшим подмножеством Python:
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
К сожалению, кажется, что Numba работает только с массивами Numpy, но не с другими объектами Python. Теоретически возможно также скомпилировать Python в C++, а затем автоматически распараллелить его с помощью компилятора Intel C++, хотя я еще не пробовал этого.
Если вы не можете потратить время на изучение требований и предположений библиотек или модулей, рекомендованных в других ответах, вам может подойти следующее:
- Дайте вашему сценарию параметры для запуска отдельных частей задачи.
- Когда готов к запуску
части параллельно, запускайте их с помощью , предоставляя номер детали и другие сведения в дополнительных файлах опций и/или параметров и вызывая для каждого дочернего элемента.
Если вы хотите следить за прогрессом, запускать больше воркеров, как только воркеры заканчивают работу, или делать что-то еще во время ожидания, используйте
Для больших задач накладные расходы на запуск новых процессов и запись и чтение файлов минимальны. Для многих небольших задач хотелось бы запускать воркеры только один раз, а затем связываться с ними через пайпы или сокеты, но это намного больше работы, и ее нужно делать осторожно, чтобы избежать возможности взаимоблокировок. В этой ситуации, вероятно, лучше научиться использовать модули, рекомендованные в других ответах.
Вот полный пример, который работает в среде Windows; преимущество асинхронной обработки заключается в экономии времени:
import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool
def f1(a):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1
def f2(b):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
result1 = pool.apply_async(f1, [0])
result2 = pool.apply_async(f2, [9])
freeze_support()
t0 = time.time()
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
print(time.time()-t0)
t0 = time.time()
aa = f1(1)
bb = f2(2)
print(time.time()-t0)
Вы не можете выполнять параллельное программирование на Python с использованием потоков. Вы должны использовать многопроцессорную обработку, или, если вы делаете такие вещи, как файлы или интернет-пакеты, вы можете использоватьasync
,await
, иasyncio
.
Если вам нужны потоки, вы можете попробовать использоватьcython
но вам необходимо установить Visual Studio с Python, а также установить пакет разработчика.
Вы можете преобразовать свой Dataframe в Dask Dataframe, и он может обрабатывать параллельные вычисления для вас.
import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)