Как сделать параллельное программирование в Python

Для C++ мы можем использовать OpenMP для параллельного программирования; однако OpenMP не будет работать для Python. Что я должен делать, если я хочу распараллелить некоторые части моей программы на Python?

Структура кода может рассматриваться как:

 solve1(A)
 solve2(B)

куда solve1 а также solve2 две независимые функции. Как запустить такой код параллельно, а не последовательно, чтобы сократить время выполнения? Надеюсь, кто-нибудь может мне помочь. Большое спасибо заранее. Код является:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Где setinner и setouter - две независимые функции. Вот где я хочу провести параллель...

11 ответов

Решение

Вы можете использовать многопроцессорный модуль. Для этого случая я мог бы использовать пул обработки:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Это вызовет процессы, которые могут сделать для вас общую работу. Так как мы не прошли processes, он будет порождать один процесс для каждого ядра процессора на вашей машине. Каждое ядро ​​процессора может выполнять один процесс одновременно.

Если вы хотите отобразить список в одну функцию, вы должны сделать это:

args = [A, B]
results = pool.map(solve1, args)

Не используйте потоки, потому что GIL блокирует любые операции над объектами Python.

С Рэем это можно сделать очень элегантно.

Чтобы распараллелить ваш пример, вам нужно определить свои функции с помощью @ray.remote декоратор, а затем вызвать их с .remote,

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

Это имеет ряд преимуществ перед многопроцессорным модулем.

  1. Тот же код будет работать на многоядерном компьютере, а также на кластере компьютеров.
  2. Процессы эффективно обмениваются данными через разделяемую память и сериализацию без копирования.
  3. Сообщения об ошибках распространяются хорошо.
  4. Эти вызовы функций могут быть составлены вместе, например,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. В дополнение к удаленному вызову функций классы могут быть созданы удаленно как акторы.

Обратите внимание, что Ray - это фреймворк, который я помогал разрабатывать.

Решение, как уже говорили другие, заключается в использовании нескольких процессов. Однако то, какая структура является более подходящей, зависит от многих факторов. В дополнение к уже упомянутым, есть также charm4py и mpi4py (я разработчик charm4py).

Существует более эффективный способ реализации приведенного выше примера, чем использование абстракции рабочего пула. Основной цикл отправляет те же параметры (включая полный график G) снова и снова рабочим в каждой из 1000 итераций. Поскольку по крайней мере один работник будет находиться в другом процессе, это включает копирование и отправку аргументов в другой процесс (ы). Это может быть очень дорого в зависимости от размера объектов. Вместо этого имеет смысл, чтобы работники хранили состояние и просто отправляли обновленную информацию.

Например, в charm4py это можно сделать так:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Обратите внимание, что для этого примера нам действительно нужен только один работник. Основной цикл может выполнять одну из функций, а рабочий может выполнять другую. Но мой код помогает проиллюстрировать несколько вещей:

  1. Рабочий A работает в процессе 0 (так же, как основной цикл). В то время как result_a.get() заблокировано ожидание результата, работник А выполняет вычисления в том же процессе.
  2. Аргументы автоматически передаются по ссылке на работника А, поскольку он находится в том же процессе (копирование не выполняется).

CPython использует Global Interpreter Lock, которая делает параллельное программирование немного интереснее, чем C++

В этой теме есть несколько полезных примеров и описаний задачи:

Обходной путь Python Global Interpreter Lock (GIL) в многоядерных системах, использующих набор задач в Linux?

Вы можете использовать joblib библиотека для параллельных вычислений и многопроцессорной обработки.

from joblib import Parallel, delayed

Вы можете просто создать функцию foo который вы хотите запускать параллельно и на основе следующего фрагмента кода, реализующего параллельную обработку:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

где num_cores можно получить из multiprocessing библиотека, как указано ниже:

import multiprocessing

num_cores = multiprocessing.cpu_count()

Если у вас есть функция с более чем одним входным аргументом, и вы просто хотите перебрать один из аргументов по списку, вы можете использовать partial функция от functools библиотека, как показано ниже:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

Вы можете найти полное объяснение многопроцессорной обработки Python и R с парой примеров здесь.

Я всегда использую «многопроцессорную» родную библиотеку для обработки параллелизма в Python. Чтобы контролировать количество процессов в очереди, я использую общую переменную в качестве счетчика. В следующем примере вы можете увидеть, как работает параллельное выполнение простых процессов.

Я сделал обновление скрипта, чтобы упростить его использование. По сути, единственное, что вам нужно сделать, это переопределить processметод с функцией, которую вы хотите запустить параллельно. Смотрите пример, процедура очень проста. Кроме того, вы также можете удалить все вхождения журнала выполнения.

Когда у меня будет время, я обновлю код для работы с процессами, которые возвращают значения.

Требования

      user@host:~$ pip install coloredlogs==15.0.1

Код

Сценарий параллельной обработки (копирование и вставка) :

      #!/usr/bin/env python
# encoding: utf-8

from multiprocessing import Manager, Pool, Value, cpu_count
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os


LOG_LEVEL = "DEBUG"


def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
    assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")

    # Setting-up the script logging:
    logging.basicConfig(
        stream=sys.stdout,
        format="%(asctime)s %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level=level
    )

    logger = logging.getLogger(name)
    coloredlogs.install(level=level, logger=logger, isatty=True)

    return logger


class ParallelProcessing:
    """
    Parallel processing.

    References
    ----------
    [1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419

    Examples
    --------
    >>> class MyParallelProcessing(ParallelProcessing):
    >>>     def process(self, name: str) -> None:
    >>>         logger = get_logger()
    >>>         logger.info(f"Executing process: {name}...")
    >>>         time.sleep(5)
    >>>
    >>>
    >>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
    >>> mpp = MyParallelProcessing()
    >>> mpp.run(args_list=params_list)
    """

    _n_jobs: int
    _waiting_time: int
    _queue: Value
    _logger: Logger

    def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
        """
        Instantiates a parallel processing object to execute processes in parallel.

        Parameters
        ----------
        n_jobs: int
            Number of jobs.
        waiting_time: int
            Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
        """
        self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
        self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
        self._logger = get_logger()

    def process(self, *args) -> None:
        """
        Abstract process that must be overridden.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        raise NotImplementedError("Process not defined ('NotImplementedError' exception).")

    def _execute(self, *args) -> None:
        """
        Run the process and remove it from the process queue by decreasing the queue process counter.

        Parameters
        ----------
        *args
            Parameters of the process to be executed.
        """
        self.process(*args)
        self._queue.value -= 1

    def _error_callback(self, result: Any) -> None:
        """
        Error callback.

        Parameters
        ----------
        result: Any
            Result from exceptions.
        """
        self._logger.error(result)
        os._exit(1)

    def run(self, args_list: Iterator[tuple]) -> None:
        """
        Run processes in parallel.

        Parameters
        ----------
        args_list: Iterator[tuple]
            List of process parameters (`*args`).
        """
        manager = Manager()
        self._queue = manager.Value('i', 0)
        lock = manager.Lock()
        pool = Pool(processes=self._n_jobs)

        start_time = datetime.now()

        with lock:  # Write-protecting the processes queue shared variable.
            for args in args_list:
                while True:
                    if self._queue.value < self._n_jobs:
                        self._queue.value += 1

                        # Running processes in parallel:
                        pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)

                        break
                    else:
                        self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
                        time.sleep(self._waiting_time)

        pool.close()
        pool.join()

        exec_time = datetime.now() - start_time
        self._logger.info(f"Execution time: {exec_time}")

Пример использования:

      class MyParallelProcessing(ParallelProcessing):
    def process(self, name: str) -> None:
        """
        Process to run in parallel (overrides abstract method).
        """
        logger = get_logger()
        logger.info(f"Executing process: {name}...")
        time.sleep(5)


def main() -> None:
    n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.
    params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]

    mpp = MyParallelProcessing(n_jobs=n_jobs)

    # Executing processes in parallel:
    mpp.run(args_list=params_list)


if __name__ == '__main__':
    main()

Выполнение и вывод

      user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
      user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
      user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

В некоторых случаях можно автоматически распараллеливать циклы с помощью Numba, хотя это работает только с небольшим подмножеством Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

К сожалению, кажется, что Numba работает только с массивами Numpy, но не с другими объектами Python. Теоретически возможно также скомпилировать Python в C++, а затем автоматически распараллелить его с помощью компилятора Intel C++, хотя я еще не пробовал этого.

Если вы не можете потратить время на изучение требований и предположений библиотек или модулей, рекомендованных в других ответах, вам может подойти следующее:

  1. Дайте вашему сценарию параметры для запуска отдельных частей задачи.
  2. Когда готов к запуску части параллельно, запускайте их с помощью , предоставляя номер детали и другие сведения в дополнительных файлах опций и/или параметров и вызывая для каждого дочернего элемента.

Если вы хотите следить за прогрессом, запускать больше воркеров, как только воркеры заканчивают работу, или делать что-то еще во время ожидания, используйте вместо и проверьте, все еще .

Для больших задач накладные расходы на запуск новых процессов и запись и чтение файлов минимальны. Для многих небольших задач хотелось бы запускать воркеры только один раз, а затем связываться с ними через пайпы или сокеты, но это намного больше работы, и ее нужно делать осторожно, чтобы избежать возможности взаимоблокировок. В этой ситуации, вероятно, лучше научиться использовать модули, рекомендованные в других ответах.

Вот полный пример, который работает в среде Windows; преимущество асинхронной обработки заключается в экономии времени:

      import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool


def f1(a):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1


def f2(b):
    c = 0
    for i in range(0, 99999999):
        c = c + 1
    return 1

if __name__ == '__main__':

    pool = Pool(multiprocessing.cpu_count())
    result1 = pool.apply_async(f1, [0])
    result2 = pool.apply_async(f2, [9])
    freeze_support()
    t0 = time.time()
    answer1 = result1.get(timeout=10)
    answer2 = result2.get(timeout=10)
    print(time.time()-t0)
    t0 = time.time()
    aa = f1(1)
    bb = f2(2)
    print(time.time()-t0)

Вы не можете выполнять параллельное программирование на Python с использованием потоков. Вы должны использовать многопроцессорную обработку, или, если вы делаете такие вещи, как файлы или интернет-пакеты, вы можете использоватьasync,await, иasyncio.

Если вам нужны потоки, вы можете попробовать использоватьcythonно вам необходимо установить Visual Studio с Python, а также установить пакет разработчика.

Вы можете преобразовать свой Dataframe в Dask Dataframe, и он может обрабатывать параллельные вычисления для вас.

      import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)
Другие вопросы по тегам