Копируются ли общие данные только для чтения в разные процессы для многопроцессорной обработки?

Кусок кода, который у меня есть, выглядит примерно так:

glbl_array = # a 3 Gb array

def my_func( args, def_param = glbl_array):
    #do stuff on args and def_param

if __name__ == '__main__':
  pool = Pool(processes=4)
  pool.map(my_func, range(1000))

Есть ли способ убедиться (или поощрить), что разные процессы не получают копию glbl_array, а делится ею. Если нет способа остановить копирование, я воспользуюсь массивом memmapped, но мои шаблоны доступа не очень регулярны, поэтому я ожидаю, что массивы memmapped будут работать медленнее. Выше было похоже на первое, что нужно попробовать. Это на Linux. Я просто хотел получить совет от Stackru и не хочу раздражать сисадмина. Как вы думаете, это поможет, если второй параметр является подлинным неизменным объектом, как glbl_array.tostring(),

5 ответов

Решение

Вы можете использовать общую память из multiprocessing вместе с Numpy довольно легко:

import multiprocessing
import ctypes
import numpy as np

shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

# Parallel processing
def my_func(i, def_param=shared_array):
    shared_array[i,:] = i

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(my_func, range(10))

    print shared_array

который печатает

[[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [ 2.  2.  2.  2.  2.  2.  2.  2.  2.  2.]
 [ 3.  3.  3.  3.  3.  3.  3.  3.  3.  3.]
 [ 4.  4.  4.  4.  4.  4.  4.  4.  4.  4.]
 [ 5.  5.  5.  5.  5.  5.  5.  5.  5.  5.]
 [ 6.  6.  6.  6.  6.  6.  6.  6.  6.  6.]
 [ 7.  7.  7.  7.  7.  7.  7.  7.  7.  7.]
 [ 8.  8.  8.  8.  8.  8.  8.  8.  8.  8.]
 [ 9.  9.  9.  9.  9.  9.  9.  9.  9.  9.]]

Однако в Linux есть семантика копирования при записи fork() так что даже без использования multiprocessing.Array, данные не будут скопированы, пока они не будут записаны.

Следующий код работает на Win7 и Mac (возможно на linux, но не тестировался).

import multiprocessing
import ctypes
import numpy as np

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

shared_array = None

def init(shared_array_base):
    global shared_array
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)

# Parallel processing
def my_func(i):
    shared_array[i, :] = i

if __name__ == '__main__':
    shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)

    pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,))
    pool.map(my_func, range(10))

    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)
    print shared_array

Для тех, кто застрял с помощью Windows, которая не поддерживает fork() (если не используется CygWin), ответ pv не работает. Глобальные переменные недоступны для дочерних процессов.

Вместо этого вы должны передать общую память во время инициализации Pool/Process в качестве таких:

#! /usr/bin/python

import time

from multiprocessing import Process, Queue, Array

def f(q,a):
    m = q.get()
    print m
    print a[0], a[1], a[2]
    m = q.get()
    print m
    print a[0], a[1], a[2]

if __name__ == '__main__':
    a = Array('B', (1, 2, 3), lock=False)
    q = Queue()
    p = Process(target=f, args=(q,a))
    p.start()
    q.put([1, 2, 3])
    time.sleep(1)
    a[0:3] = (4, 5, 6)
    q.put([4, 5, 6])
    p.join()

(это не просто код, и это не хороший код, но он иллюстрирует суть;-)

Я знаю, я отвечаю на очень старый вопрос. Но эта тема не работает в ОС Windows. Приведенные выше ответы вводили в заблуждение, не предоставляя существенных доказательств. Итак, я попробовал следующий код.

      # -*- coding: utf-8 -*-
from __future__ import annotations
import ctypes
import itertools
import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import numpy.typing as npt


shared_np_array_for_subprocess: npt.NDArray[np.double]


def init_processing(shared_raw_array_obj: ctypes.Array[ctypes.c_double]):
    global shared_np_array_for_subprocess
    #shared_np_array_for_subprocess = np.frombuffer(shared_raw_array_obj, dtype=np.double)
    shared_np_array_for_subprocess = np.ctypeslib.as_array(shared_raw_array_obj)


def do_processing(i: int) -> int:
    print("\n--------------->>>>>>")
    print(f"[P{i}] input is {i} in process id {os.getpid()}")
    print(f"[P{i}] 0th element via np access: ", shared_np_array_for_subprocess[0])
    print(f"[P{i}] 1st element via np access: ", shared_np_array_for_subprocess[1])
    print(f"[P{i}] NP array's base memory is: ", shared_np_array_for_subprocess.base)
    np_array_addr, _ = shared_np_array_for_subprocess.__array_interface__["data"]
    print(f"[P{i}] NP array obj pointing memory address is: ", hex(np_array_addr))
    print("\n--------------->>>>>>")
    time.sleep(3.0)
    return i


if __name__ == "__main__":
    shared_raw_array_obj: ctypes.Array[ctypes.c_double] = multiprocessing.RawArray(ctypes.c_double, 128)  # 8B * 1MB = 8MB
    # This array is malloced, 0 filled.
    print("Shared Allocated Raw array: ", shared_raw_array_obj)
    shared_raw_array_ptr = ctypes.addressof(shared_raw_array_obj)
    print("Shared Raw Array memory address: ", hex(shared_raw_array_ptr))

    # Assign data
    print("Assign 0, 1 element data in Shared Raw array.")
    shared_raw_array_obj[0] = 10.2346
    shared_raw_array_obj[1] = 11.9876

    print("0th element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr).value)
    print("1st element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr + ctypes.sizeof(ctypes.c_double)).value)

    print("Create NP array from the Shared Raw array memory")
    shared_np_array: npt.NDArray[np.double] = np.frombuffer(shared_raw_array_obj, dtype=np.double)

    print("0th element via np access: ", shared_np_array[0])
    print("1st element via np access: ", shared_np_array[1])

    print("NP array's base memory is: ", shared_np_array.base)
    np_array_addr, _ = shared_np_array.__array_interface__["data"]
    print("NP array obj pointing memory address is: ", hex(np_array_addr))

    print("NP array , Raw array points to same memory , No copies? : ", np_array_addr == shared_raw_array_ptr)

    print("Now that we have native memory based NP array , Send for multi processing.")

    # results = []
    with ProcessPoolExecutor(max_workers=4, initializer=init_processing, initargs=(shared_raw_array_obj,)) as process_executor:
        results = process_executor.map(do_processing, range(0, 2))

    print("All jobs sumitted.")
    for result in results:
        print(result)

    print("Main process is going to shutdown.")
    exit(0)

вот пример вывода

      Shared Allocated Raw array:  <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
Shared Raw Array memory address:  0x1b804300000
Assign 0, 1 element data in Shared Raw array.
0th element via ptr access:  10.2346
1st element via ptr access:  11.9876
Create NP array from the Shared Raw array memory
0th element via np access:  10.2346
1st element via np access:  11.9876
NP array's base memory is:  <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
NP array obj pointing memory address is:  0x1b804300000
NP array , Raw array points to same memory , No copies? :  True
Now that we have native memory based NP array , Send for multi processing.

--------------->>>>>>
[P0] input is 0 in process id 21852
[P0] 0th element via np access:  10.2346
[P0] 1st element via np access:  11.9876
[P0] NP array's base memory is:  <memory at 0x0000021C7ACAFF40>
[P0] NP array obj pointing memory address is:  0x21c7ad60000

--------------->>>>>>

--------------->>>>>>
[P1] input is 1 in process id 11232
[P1] 0th element via np access:  10.2346
[P1] 1st element via np access:  11.9876
[P1] NP array's base memory is:  <memory at 0x0000022C7FF3FF40>
[P1] NP array obj pointing memory address is:  0x22c7fff0000

--------------->>>>>>
All jobs sumitted.
0
1
Main process is going to shutdown.

Вышеприведенный вывод из следующей среды:

      OS: Windows 10 20H2
Python: Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)]

Вы можете ясно видеть, что массив памяти, указывающий на numpy, отличается для каждого подпроцесса, что означает создание memcopies. Таким образом, в ОС Windows подпроцесс не использует общую память. Я думаю, это из-за защиты ОС, процессы не могут ссылаться на произвольный адрес указателя в памяти, это приведет к нарушениям доступа к памяти.

Если вы ищете вариант, который эффективно работает в Windows и хорошо подходит для нерегулярных шаблонов доступа, ветвления и других сценариев, где вам может потребоваться анализ различных матриц на основе комбинации матрицы совместно используемой памяти и локальных данных процесса, Инструментарий mathDict в пакете ParallelRegression был разработан для решения именно этой ситуации.

Другие вопросы по тегам