Использование многопроцессорной обработки в классе
Я использовал multiprocessing
идеально в грязной конфигурации моего кода. Я решил дать некоторый порядок своему коду и переписать его как класс, тогда я могу легко изменить входные данные, мой новый код выглядит следующим образом:
class LikelihoodTest:
def __init__(self,Xgal,Ygal):
self.x=Xgal
self.y=Ygal
self.objPosition=gal_pos
self.beta_s=beta
self.RhoCrit_SigmaC=rho_c_over_sigma_c
self.AngularDiameter=DA
self.RhoCrit=rho_crit
self.Reducedshear=observed_g
self.ShearError=g_err
#The 2D function
def like2d(self,posx, posy):
stuff=[self.objPosition, self.beta_s, self.RhoCrit_SigmaC , self.AngularDiameter, self.RhoCrit]
m=4.447e14
c=7.16
param=[posx, posy, m, c]
return reduced_shear( param, stuff, self.Reducedshear, self.ShearError)
def ShearLikelihood(self):
n=len(self.x)
m=len(self.y)
shared_array_base = multiprocessing.Array(ctypes.c_double, n*m)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape( n,m)
#Restructure the function before you create instance of Pool.
# Parallel processing
def my_func(self,i, def_param=shared_array):
shared_array[i,:] = np.array([float(self.like2d(self.x[j],self.y[i])) for j in range(len(self.x))])
while True:
try:
print "processing to estimate likelihood in 2D grids......!!!"
start = time.time()
pool = multiprocessing.Pool(processes=10)
pool.map(my_func, range(len(self.y)))
print shared_array
end = time.time()
print "process time:\n",end - start
pool.close()
except ValueError:
print "Oops! value error!"
return shared_array
def plotLikelihood(self,shared_array):
#plotting on a mesh the likelihood function in order to see whether you have defined the inputs correctly and you can observe the maximum likelihood in 2D
# Set up a regular grid of interpolation points
xi, yi = np.linspace(self.x.min(), self.x.max(), 100), np.linspace(self.y.min(), self.y.max(), 100)
# Interpolate
rbf = scipy.interpolate.interp2d(self.x, self.y,shared_array , kind='linear')
zi = rbf(xi, yi)
fig, ax = plt.subplots()
divider = make_axes_locatable(ax)
im = ax.imshow(zi, vmin=shared_array.min(), vmax=shared_array.max(), origin='lower',
extent=[self.x.min(), self.x.max(), self.y.min(),self.y.max()])
ax.set_xlabel(r"$Xpos$")
ax.set_ylabel(r"$Ypos$")
ax.xaxis.set_label_position('top')
ax.xaxis.set_tick_params(labeltop='on')
cax = divider.append_axes("right", size="5%", pad=0.05)
cbar = fig.colorbar(im,cax=cax, ticks=list(np.linspace(shared_array.max(), shared_array.min(),20)),format='$%.2f$')
cbar.ax.tick_params(labelsize=8)
plt.savefig('/users/Desktop/MassRecons/Likelihood2d_XY_Without_Shear_Uncertainty.pdf', transparent=True, bbox_inches='tight', pad_inches=0)
plt.close()
Я получил следующую ошибку при попытке запустить его с конфигурацией класса:
if __name__ == '__main__':
Xgal = np.linspace(Xgalaxy.min(), Xgalaxy.max(), 1000)
Ygal = np.linspace(Ygalaxy.min(), Ygalaxy.max(), 1000)
Test=LikelihoodTest(Xgal,Ygal)
Test.ShearLikelihood()
processing to estimate likelihood in 2D grids......!!!
ERROR: PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed [multiprocessing.pool]
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 34, in ShearLikelihood
File "/vol/1/anaconda/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/vol/1/anaconda/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Есть ли способ это исправить?
2 ответа
Я мог бы наконец понять, как я могу использовать multiprocessing
работая в моем классе. я использовал pathos.multiprocessing
и изменил код следующим образом:
import numpy as np
import pathos.multiprocessing as multiprocessing
class LikelihoodTest:
def __init__(self,Xgal,Ygal):
self.x=Xgal
self.y=Ygal
self.objPosition=gal_pos
self.beta_s=beta
self.RhoCrit_SigmaC=rho_c_over_sigma_c
self.AngularDiameter=DA
self.RhoCrit=rho_crit
self.Reducedshear=observed_g
self.ShearError=g_err
#The 2D function
def like2d(self,posx, posy):
stuff=[self.objPosition, self.beta_s, self.RhoCrit_SigmaC , self.AngularDiameter, self.RhoCrit]
m=4.447e14
c=7.16
param=[posx, posy, m, c]
return reduced_shear( param, stuff, self.Reducedshear, self.ShearError)
def ShearLikelihood(self,r):
return [float(self.like2d(self.x[j],r)) for j in range(len(self.x))]
def run(self):
try:
print "processing to estimate likelihood in 2D grids......!!!"
start = time.time()
pool = multiprocessing.Pool(processes=10)
seq=[ self.y[i] for i in range( self.y.shape[0])]
results=np.array( pool.map(self.ShearLikelihood, seq ))
end = time.time()
print "process time:\n",end - start
pool.close()
except ValueError:
print "Oops! value error ....!"
return results
def plotLikelihood(self,shared_array):
#plotting on a mesh the likelihood function in order to see whether you have defined the inputs correctly and you can observe the maximum likelihood in 2D
# Set up a regular grid of interpolation points
xi, yi = np.linspace(self.x.min(), self.x.max(), 100), np.linspace(self.y.min(), self.y.max(), 100)
# Interpolate
rbf = scipy.interpolate.interp2d(self.x, self.y,shared_array , kind='linear')
zi = rbf(xi, yi)
fig, ax = plt.subplots()
divider = make_axes_locatable(ax)
im = ax.imshow(zi, vmin=shared_array.min(), vmax=shared_array.max(), origin='lower',
extent=[self.x.min(), self.x.max(), self.y.min(),self.y.max()])
ax.set_xlabel(r"$Xpos$")
ax.set_ylabel(r"$Ypos$")
ax.xaxis.set_label_position('top')
ax.xaxis.set_tick_params(labeltop='on')
cax = divider.append_axes("right", size="5%", pad=0.05)
cbar = fig.colorbar(im,cax=cax, ticks=list(np.linspace(shared_array.max(), shared_array.min(),20)),format='$%.2f$')
cbar.ax.tick_params(labelsize=8)
plt.savefig('/users/Desktop/MassRecons/Likelihood2d_XY_coordinate.pdf', transparent=True, bbox_inches='tight', pad_inches=0)
plt.close()
if __name__ == '__main__':
Xgal = np.linspace(Xgalaxy.min(), Xgalaxy.max(), 1000)
Ygal = np.linspace(Ygalaxy.min(), Ygalaxy.max(), 1000)
Test=LikelihoodTest(Xgal,Ygal)
x=Test.run()
Test.plotLikelihood(x)
Теперь это работает как шарм!:)
Вы не можете передавать функции или методы в разные процессы, используя Pickle, но вы можете передавать строки.
Вы можете вести словарь методов и обращаться к методам через их строковые ключи. Это не очень элегантно, но решает проблему.
РЕДАКТИРОВАТЬ: Когда вы используете многопроцессорность, есть неявная "вилка". Это создает несколько независимых процессов без общих ресурсов, потому что каждая вещь, которую вы передаете другому процессу, должна быть сериализована с помощью Pickle. Проблема в том, что pickle не позволяет сериализовать исполняемый код для отправки другому процессу.