Заполнение матрицы с помощью параллельной обработки в Юлии
Я пытаюсь ускорить время решения проблемы динамического программирования в Julia (v. 0.5.0) с помощью параллельной обработки. Проблема заключается в выборе оптимальных значений для каждого элемента матрицы 1073 x 19 на каждой итерации, пока последующие различия матриц не попадут в допустимое отклонение. Я думал, что в каждой итерации заполнение значений для каждого элемента матрицы может быть распараллелено. Тем не менее, я вижу огромное снижение производительности при использовании SharedArray
и мне интересно, есть ли лучший способ приблизиться к параллельной обработке для этой проблемы.
Я строю аргументы для функции ниже:
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]
r = 0.015
tau = 0.35
rho =est_params[1]
sigma =est_params[2]
delta = 0.15
gamma =est_params[3]
a_capital =est_params[4]
lambda1 =est_params[5]
lambda2 =est_params[6]
s =est_params[7]
theta =est_params[8]
mu =est_params[9]
p_bar_k_ss =est_params[10]
beta = (1+r)^(-1)
sigma_range = 4
gz = 19
gp = 29
gk = 37
lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp(lnz)
gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss = (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))
#Tauchen
N = length(z)
Z = zeros(N,1)
Zprob = zeros(Float32,N,N)
Z[N] = lnz[length(z)]
Z[1] = lnz[1]
zstep = (Z[N] - Z[1]) / (N - 1)
for i=2:(N-1)
Z[i] = Z[1] + zstep * (i - 1)
end
for a = 1 : N
for b = 1 : N
if b == 1
Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
elseif b == N
Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
else
Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
end
end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float32,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)
gm = gk * gp
control=zeros(gm,2)
for i=1:gk
control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)
E=Array(Float32,gm,gm,gz)
for h=1:gm
for m=1:gm
for j=1:gz
# set the nonzero net debt indicator
if endog[h,2]<0
p_ind=1
else
p_ind=0
end
# set the investment indicator
if (control[m,1]-(1-delta)*endog[h,1])!=0
i_ind=1
else
i_ind=0
end
E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau)) +
delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
(i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
s*endog[h,2]*p_ind
elem = E[m,h,j]
if E[m,h,j]<0
E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
else
E[m,h,j]=elem
end
end
end
end
Затем я построил функцию с последовательной обработкой. Два for
циклы перебирают каждый элемент, чтобы найти наибольшее значение в 1072-размере (= gm
скалярный аргумент в функции) массив:
function dynam_serial(E,gm,gz,beta,Zprob)
v = Array(Float32,gm,gz )
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv = Array(Float32,gm,gz)
# Set parameters for the loop
convcrit = 0.0001 # chosen convergence criterion
diff = 1 # arbitrary initial value greater than convcrit
while diff>convcrit
exp_v=v*Zprob'
for h=1:gm
for j=1:gz
Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
end
end
diff = maxabs(Tv - v)
v=copy(Tv)
end
end
Сроки этого я получаю:
@time dynam_serial(E,gm,gz,beta,Zprob)
> 106.880008 seconds (91.70 M allocations: 203.233 GB, 15.22% gc time)
Теперь я пытаюсь использовать Shared Arrays для параллельной обработки. Обратите внимание, что я перенастроил итерацию так, чтобы у меня был только один for
петля, а не два. Я также использую v=deepcopy(Tv)
; иначе, v
копируется как Array
объект, а не SharedArray
:
function dynam_parallel(E,gm,gz,beta,Zprob)
v = SharedArray(Float32,(gm,gz),init = S -> S[Base.localindexes(S)] = myid() )
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
# Set parameters for the loop
convcrit = 0.0001 # chosen convergence criterion
diff = 1 # arbitrary initial value greater than convcrit
while diff>convcrit
exp_v=v*Zprob'
Tv = SharedArray(Float32,gm,gz,init = S -> S[Base.localindexes(S)] = myid() )
@sync @parallel for hj=1:(gm*gz)
j=cld(hj,gm)
h=mod(hj,gm)
if h==0;h=gm;end;
@async Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
end
diff = maxabs(Tv - v)
v=deepcopy(Tv)
end
end
Сроки параллельной версии; и используя 4-ядерный процессор I7 с частотой 2,5 ГГц и 16 ГБ памяти, я получаю:
addprocs(3)
@time dynam_parallel(E,gm,gz,beta,Zprob)
> 164.237208 seconds (2.64 M allocations: 201.812 MB, 0.04% gc time)
Я делаю что-то неправильно здесь? Или есть лучший способ приблизиться к параллельной обработке в Юлии для этой конкретной проблемы? Я подумал об использовании распределенных массивов, но мне сложно понять, как применить их к данной проблеме.
ОБНОВЛЕНИЕ: За @DanGetz и его полезными комментариями я вместо этого попытался ускорить последовательную версию обработки. Я смог снизить производительность до 53.469780 seconds (67.36 M allocations: 103.419 GiB, 19.12% gc time)
через:
1) Обновление до 0.6.0 (сэкономлено около 25 секунд), которое включает в себя полезные @views
макро.
2) Предварительно выделив основной массив, который я пытаюсь заполнить (Tv
), в разделе "Предварительное распределение результатов" в "Советы по производительности Julia": https://docs.julialang.org/en/latest/manual/performance-tips/. (сэкономил еще около 25 секунд)
Наибольшее замедление, по-видимому, происходит от add_vecs
функция, которая суммирует подмассивы двух больших матриц. Я пробовал девекторизовать и использовать функции BLAS, но не смог добиться лучшей производительности.
В любом случае, улучшенный код для dynam_serial
ниже:
function add_vecs(r::Array{Float32},h::Int,j::Int,E::Array{Float32},exp_v::Array{Float32},beta::Float32)
@views r=E[:,h,j] + beta*exp_v[:,j]
return r
end
function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
v = Array{Float32}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv = Array{Float32}(gm,gz)
r = Array{Float32}(gm)
# Set parameters for the loop
convcrit = 0.0001 # chosen convergence criterion
diff = 1 # arbitrary initial value greater than convcrit
while diff>convcrit
exp_v=v*Zprob'
for h=1:gm
for j=1:gz
@views Tv[h,j]=findmax(add_vecs(r,h,j,E,exp_v,beta))[1]
end
end
diff = maximum(abs,Tv - v)
v=copy(Tv)
end
return Tv
end
2 ответа
Если add_vecs
кажется критической функцией, написание явного for
Цикл может предложить больше оптимизации. Как работает следующий бенчмарк:
function add_vecs!(r::Array{Float32},h::Int,j::Int,E::Array{Float32},
exp_v::Array{Float32},beta::Float32)
@inbounds for i=1:size(E,1)
r[i]=E[i,h,j] + beta*exp_v[i,j]
end
return r
end
ОБНОВИТЬ
Чтобы продолжить оптимизацию dynam_serial
Я попытался удалить больше выделений. Результат:
function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
exp_v::Array{Float64},beta::Float64)
@inbounds for i=1:gm
r[i] = E[i,h,j]+beta*exp_v[i,j]
end
return findmax(r)[1]
end
function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
beta::Float64,Zprob::Array{Float64})
v = Array{Float64}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
r = Array{Float64}(gm)
exp_v = Array{Float64}(gm,gz)
# Set parameters for the loop
convcrit = 0.0001 # chosen convergence criterion
diff = 1.0 # arbitrary initial value greater than convcrit
while diff>convcrit
A_mul_Bt!(exp_v,v,Zprob)
diff = -Inf
for h=1:gm
for j=1:gz
oldv = v[h,j]
newv = add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
v[h,j]= newv
diff = max(diff, oldv-newv, newv-oldv)
end
end
end
return v
end
Переключение функций на использование Float64 должно увеличить скорость (поскольку процессоры оптимизированы для работы с 64-битными длинами слов). Кроме того, используя мутацию A_mul_Bt!
напрямую сохраняет другое распределение. Избегать copy(...)
переключая массивы v
а также Tv
,
Как эти оптимизации улучшают ваше время выполнения?
2-е ОБНОВЛЕНИЕ
Обновлен код в разделе ОБНОВЛЕНИЕ для использования findmax
, Также поменял dynam_serial
использовать v
без Tv
, поскольку не было необходимости сохранять старую версию, кроме diff
расчет, который теперь делается внутри цикла.
Вот код, который я скопировал и вставил, предоставленный Дэном Гетцем выше. Я включаю массивы и скалярные определения в точности так, как я их запустил. Производительность была: 39.507005 seconds (11 allocations: 486.891 KiB)
при беге @time dynam_serial(E,gm,gz,beta,Zprob)
,
using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]
r = 0.015
tau = 0.35
rho =est_params[1]
sigma =est_params[2]
delta = 0.15
gamma =est_params[3]
a_capital =est_params[4]
lambda1 =est_params[5]
lambda2 =est_params[6]
s =est_params[7]
theta =est_params[8]
mu =est_params[9]
p_bar_k_ss =est_params[10]
beta = (1+r)^(-1)
sigma_range = 4
gz = 19 #15 #19
gp = 29 #19 #29
gk = 37 #25 #37
lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp.(lnz)
gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss = (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))
#Tauchen
N = length(z)
Z = zeros(N,1)
Zprob = zeros(Float64,N,N)
Z[N] = lnz[length(z)]
Z[1] = lnz[1]
zstep = (Z[N] - Z[1]) / (N - 1)
for i=2:(N-1)
Z[i] = Z[1] + zstep * (i - 1)
end
for a = 1 : N
for b = 1 : N
if b == 1
Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
elseif b == N
Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
else
Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
end
end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float64,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)
gm = gk * gp
control=zeros(gm,2)
for i=1:gk
control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)
E=Array(Float64,gm,gm,gz)
for h=1:gm
for m=1:gm
for j=1:gz
# set the nonzero net debt indicator
if endog[h,2]<0
p_ind=1
else
p_ind=0
end
# set the investment indicator
if (control[m,1]-(1-delta)*endog[h,1])!=0
i_ind=1
else
i_ind=0
end
E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau)) +
delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
(i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
s*endog[h,2]*p_ind
elem = E[m,h,j]
if E[m,h,j]<0
E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
else
E[m,h,j]=elem
end
end
end
end
function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
exp_v::Array{Float64},beta::Float64)
maxr = -Inf
@inbounds for i=1:gm r[i] = E[i,h,j]+beta*exp_v[i,j]
maxr = max(r[i],maxr)
end
return maxr
end
function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
beta::Float64,Zprob::Array{Float64})
v = Array{Float64}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv = Array{Float64}(gm,gz)
r = Array{Float64}(gm)
exp_v = Array{Float64}(gm,gz)
# Set parameters for the loop
convcrit = 0.0001 # chosen convergence criterion
diff = 1.0 # arbitrary initial value greater than convcrit
while diff>convcrit
A_mul_Bt!(exp_v,v,Zprob)
diff = -Inf
for h=1:gm
for j=1:gz
Tv[h,j]=add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
diff = max(abs(Tv[h,j]-v[h,j]),diff)
end
end
(v,Tv)=(Tv,v)
end
return v
end
Теперь, вот еще одна версия алгоритма и входных данных. Функции похожи на то, что предложил Дэн Гетц, за исключением того, что я использую findmax
а не повторный max
функция, чтобы найти максимум массива. В конструкции ввода я использую оба Float32
и смешивание разных типов битов вместе. Тем не менее, я последовательно добивался лучших результатов таким образом: 24.905569 seconds (1.81 k allocations: 46.829 MiB, 0.01% gc time)
, Но не совсем понятно почему.
using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]
r = 0.015
tau = 0.35
rho =est_params[1]
sigma =est_params[2]
delta = 0.15
gamma =est_params[3]
a_capital =est_params[4]
lambda1 =est_params[5]
lambda2 =est_params[6]
s =est_params[7]
theta =est_params[8]
mu =est_params[9]
p_bar_k_ss =est_params[10]
beta = Float32((1+r)^(-1))
sigma_range = 4
gz = 19
gp = 29
gk = 37
lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp(lnz)
gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss = (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))
#Tauchen
N = length(z)
Z = zeros(N,1)
Zprob = zeros(Float32,N,N)
Z[N] = lnz[length(z)]
Z[1] = lnz[1]
zstep = (Z[N] - Z[1]) / (N - 1)
for i=2:(N-1)
Z[i] = Z[1] + zstep * (i - 1)
end
for a = 1 : N
for b = 1 : N
if b == 1
Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
elseif b == N
Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
else
Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
end
end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float32,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)
gm = gk * gp
control=zeros(gm,2)
for i=1:gk
control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)
E=Array(Float32,gm,gm,gz)
for h=1:gm
for m=1:gm
for j=1:gz
# set the nonzero net debt indicator
if endog[h,2]<0
p_ind=1
else
p_ind=0
end
# set the investment indicator
if (control[m,1]-(1-delta)*endog[h,1])!=0
i_ind=1
else
i_ind=0
end
E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau)) +
delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
(i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
s*endog[h,2]*p_ind
elem = E[m,h,j]
if E[m,h,j]<0
E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
else
E[m,h,j]=elem
end
end
end
end
function add_vecs!(gm::Int,r::Array{Float32},h::Int,j::Int,E::Array{Float32},
exp_v::Array{Float32},beta::Float32)
@inbounds @views for i=1:gm
r[i]=E[i,h,j] + beta*exp_v[i,j]
end
return r
end
function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
v = Array{Float32}(gm,gz)
fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
Tv = Array{Float32}(gm,gz)
# Set parameters for the loop
convcrit = 0.0001 # chosen convergence criterion
diff = 1.00000 # arbitrary initial value greater than convcrit
iter=0
exp_v=Array{Float32}(gm,gz)
r=Array{Float32}(gm)
while diff>convcrit
A_mul_Bt!(exp_v,v,Zprob)
for h=1:gm
for j=1:gz
Tv[h,j]=findmax(add_vecs!(gm,r,h,j,E,exp_v,beta))[1]
end
end
diff = maximum(abs,Tv - v)
(v,Tv)=(Tv,v)
end
return v
end