Что могут делать мультипроцессор и укроп вместе?
Я хотел бы использовать multiprocessing
библиотека на питоне. грустно multiprocessing
использования pickle
который не поддерживает функции с замыканиями, лямбдами или функциями в __main__
, Все три из них важны для меня
In [1]: import pickle
In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
К счастью, есть dill
более крепкий рассол. По-видимому dill
выполняет магию при импорте, чтобы сделать засолку
In [3]: import dill
In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...
Это очень обнадеживает, особенно потому, что у меня нет доступа к многопроцессорному исходному коду. К сожалению, я до сих пор не могу заставить этот самый простой пример работать
import multiprocessing as mp
import dill
p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))
Почему это? Что мне не хватает? Каковы ограничения на multiprocessing
+ dill
сочетание?
Временное редактирование для JF Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Temporary Edit for J.F Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
^C
...lots of junk...
[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
^C
...lots of junk...
[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()
4 ответа
multiprocessing
делает некоторые плохие решения о мариновании. Не поймите меня неправильно, он делает несколько хороших выборов, которые позволяют ему выбирать определенные типы, чтобы их можно было использовать в функции карты пула. Тем не менее, так как мы имеем dill
это может сделать травление, собственное травление мультипроцессинга становится немного ограничивающим. На самом деле, если multiprocessing
должны были использовать pickle
вместо cPickle
... а также отбросить некоторые собственные переопределения травления, затем dill
может взять на себя и дать гораздо более полную сериализацию для multiprocessing
,
Пока это не произойдет, есть вилка multiprocessing
называется pathos (версия выпуска, к сожалению, немного устарела), что снимает вышеуказанные ограничения. Pathos также добавляет некоторые приятные функции, которых нет в многопроцессорных системах, например, multi-args в функции map. После небольшого обновления ожидается выпуск Pathos - в основном, переход на Python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
и просто похвастаться pathos.multiprocessing
сможет сделать...
>>> def busy_add(x,y, delay=0.01):
... for n in range(x):
... x += n
... for n in range(y):
... y -= n
... import time
... time.sleep(delay)
... return x + y
...
>>> def busy_squared(x):
... import time, random
... time.sleep(2*random.random())
... return x*x
...
>>> def squared(x):
... return x*x
...
>>> def quad_factory(a=1, b=1, c=0):
... def quad(x):
... return a*x**2 + b*x + c
... return quad
...
>>> square_plus_one = quad_factory(2,0,1)
>>>
>>> def test1(pool):
... print pool
... print "x: %s\n" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(squared, x)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
... print pool.imap.__name__
... start = time.time()
... res = pool.imap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = list(res)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
... print pool.amap.__name__
... start = time.time()
... res = pool.amap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = res.get()
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
...
>>> def test2(pool, items=4, delay=0):
... _x = range(-items/2,items/2,2)
... _y = range(len(_x))
... _d = [delay]*len(_x)
... print map
... res1 = map(busy_squared, _x)
... res2 = map(busy_add, _x, _y, _d)
... print pool.map
... _res1 = pool.map(busy_squared, _x)
... _res2 = pool.map(busy_add, _x, _y, _d)
... assert _res1 == res1
... assert _res2 == res2
... print pool.imap
... _res1 = pool.imap(busy_squared, _x)
... _res2 = pool.imap(busy_add, _x, _y, _d)
... assert list(_res1) == res1
... assert list(_res2) == res2
... print pool.amap
... _res1 = pool.amap(busy_squared, _x)
... _res2 = pool.amap(busy_add, _x, _y, _d)
... assert _res1.get() == res1
... assert _res2.get() == res2
... print ""
...
>>> def test3(pool): # test against a function that should fail in pickle
... print pool
... print "x: %s\n" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(square_plus_one, x)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
...
>>> def test4(pool, maxtries, delay):
... print pool
... m = pool.amap(busy_add, x, x)
... tries = 0
... while not m.ready():
... time.sleep(delay)
... tries += 1
... print "TRY: %s" % tries
... if tries >= maxtries:
... print "TIMEOUT"
... break
... print m.get()
...
>>> import time
>>> x = range(18)
>>> delay = 0.01
>>> items = 20
>>> maxtries = 20
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> pool = Pool(nodes=4)
>>> test1(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
map
time to results: 0.0553691387177
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
imap
time to queue: 7.91549682617e-05
time to results: 0.102381229401
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
amap
time to queue: 7.08103179932e-05
time to results: 0.0489699840546
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
>>> test2(pool, items, delay)
<built-in function map>
<bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>
>>> test3(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
map
time to results: 0.0523059368134
y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]
>>> test4(pool, maxtries, delay)
<pool ProcessingPool(ncpus=4)>
TRY: 1
TRY: 2
TRY: 3
TRY: 4
TRY: 5
TRY: 6
TRY: 7
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]
Перезаписать многопроцессорный модуль Pickle class
import dill, multiprocessing
dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
multiprocessing.reduction.ForkingPickler = dill.Pickler
multiprocessing.reduction.dump = dill.dump
multiprocessing.queues._ForkingPickler = dill.Pickler
Вы можете попробовать использовать библиотеку multiprocessing_on_dill , которая представляет собой ответвление многопроцессорной обработки, которое реализует укроп на бэкэнде.
Например, вы можете запустить:
>>> import multiprocessing_on_dill as multiprocessing
>>> with multiprocessing.Pool() as pool:
... pool.map(lambda x: x**2, range(10))
...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Я знаю, что эта ветка устарела, однако вам не обязательно использовать
pathos
модуль, как указал Майк МакКернс. Меня также очень раздражает то, что
multiprocessing
использует
pickle
вместо, так что вы можете сделать что-то вроде этого:
import multiprocessing as mp
import dill
def helperFunction(f, inp, *args, **kwargs):
import dill # reimport, just in case this is not available on the new processes
f = dill.loads(f) # converts bytes to (potentially lambda) function
return f(inp, *args, **kwargs)
def mapStuff(f, inputs, *args, **kwargs):
pool = mp.Pool(6) # create a 6-worker pool
f = dill.dumps(f) # converts (potentially lambda) function to bytes
futures = [pool.apply_async(helperFunction, [f, inp, *args], kwargs) for inp in inputs]
return [f.get() for f in futures]
Затем вы можете использовать его так:
mapStuff(lambda x: x**2, [2, 3]) # returns [4, 9]
mapStuff(lambda x, b: x**2 + b, [2, 3], 1) # returns [5, 10]
mapStuff(lambda x, b: x**2 + b, [2, 3], b=1) # also returns [5, 10]
def f(x):
return x**2
mapStuff(f, [4, 5]) # returns [16, 25]
Как это работает в основном, вы конвертируете лямбда-функцию в
bytes
объект, передайте его дочернему процессу и попросите его восстановить лямбда-функцию. В коде я только что использовал
dill
для сериализации функции, но вы также можете сериализовать аргументы, если это необходимо.