Заполнение матрицы с помощью параллельной обработки в Юлии

Я пытаюсь ускорить время решения проблемы динамического программирования в Julia (v. 0.5.0) с помощью параллельной обработки. Проблема заключается в выборе оптимальных значений для каждого элемента матрицы 1073 x 19 на каждой итерации, пока последующие различия матриц не попадут в допустимое отклонение. Я думал, что в каждой итерации заполнение значений для каждого элемента матрицы может быть распараллелено. Тем не менее, я вижу огромное снижение производительности при использовании SharedArrayи мне интересно, есть ли лучший способ приблизиться к параллельной обработке для этой проблемы.

Я строю аргументы для функции ниже:

    est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

    r          = 0.015
    tau        = 0.35
    rho        =est_params[1]
    sigma      =est_params[2]
    delta      = 0.15
    gamma      =est_params[3]
    a_capital  =est_params[4]
    lambda1    =est_params[5]
    lambda2    =est_params[6]
    s          =est_params[7]
    theta      =est_params[8]
    mu         =est_params[9]
    p_bar_k_ss  =est_params[10]
    beta  = (1+r)^(-1)
    sigma_range = 4
    gz = 19 
    gp = 29 
    gk = 37 

    lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
    z=exp(lnz)

    gk_m = fld(gk,2)
    # Need to add mu somewhere to k_ss
    k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
    k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
    insert!(k,gk_m+1,k_ss)
    sort!(k)
    p_bar=p_bar_k_ss*k_ss
    p = collect(linspace(-p_bar/2,p_bar,gp))

    #Tauchen
    N     = length(z)
    Z     = zeros(N,1)
    Zprob = zeros(Float32,N,N)

    Z[N]  = lnz[length(z)]
    Z[1]  = lnz[1]

    zstep = (Z[N] - Z[1]) / (N - 1)

    for i=2:(N-1)
        Z[i] = Z[1] + zstep * (i - 1)
    end

    for a = 1 : N
        for b = 1 : N
          if b == 1
              Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
          elseif b == N
              Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
          else
              Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                           0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
          end
        end
    end
    # Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
    # Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
    Zcumprob=zeros(Float32,gz,gz)
    # 2 in cumsum! denotes the 2nd dimension, i.e. columns
    cumsum!(Zcumprob, Zprob,2)


    gm = gk * gp

    control=zeros(gm,2)
    for i=1:gk
        control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
        control[(1+gp*(i-1)):(gp*i),2]=p
    end
    endog=copy(control)

    E=Array(Float32,gm,gm,gz)
    for h=1:gm
       for m=1:gm
           for j=1:gz
             # set the nonzero net debt indicator
              if endog[h,2]<0
                 p_ind=1

              else
                 p_ind=0
              end

               # set the investment indicator
                if (control[m,1]-(1-delta)*endog[h,1])!=0
                   i_ind=1
                else
                   i_ind=0
                end

                E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                    delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                     (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                    s*endog[h,2]*p_ind
                elem = E[m,h,j]
                if E[m,h,j]<0
                    E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
                else
                    E[m,h,j]=elem
                end
            end
        end
     end

Затем я построил функцию с последовательной обработкой. Два for циклы перебирают каждый элемент, чтобы найти наибольшее значение в 1072-размере (= gm скалярный аргумент в функции) массив:

function dynam_serial(E,gm,gz,beta,Zprob)
    v           = Array(Float32,gm,gz )
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

    Tv          = Array(Float32,gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'

      for h=1:gm
        for j=1:gz
          Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
        end
      end

      diff = maxabs(Tv - v)
      v=copy(Tv)
    end
end

Сроки этого я получаю:

@time dynam_serial(E,gm,gz,beta,Zprob)

> 106.880008 seconds (91.70 M allocations: 203.233 GB, 15.22% gc time)

Теперь я пытаюсь использовать Shared Arrays для параллельной обработки. Обратите внимание, что я перенастроил итерацию так, чтобы у меня был только один for петля, а не два. Я также использую v=deepcopy(Tv); иначе, v копируется как Array объект, а не SharedArray:

function dynam_parallel(E,gm,gz,beta,Zprob)
    v           = SharedArray(Float32,(gm,gz),init = S -> S[Base.localindexes(S)] = myid() )
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'
      Tv          = SharedArray(Float32,gm,gz,init = S -> S[Base.localindexes(S)] = myid() )

      @sync @parallel for hj=1:(gm*gz)
         j=cld(hj,gm)
         h=mod(hj,gm)
         if h==0;h=gm;end;

         @async Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
      end

      diff = maxabs(Tv - v)
      v=deepcopy(Tv)
    end
end

Сроки параллельной версии; и используя 4-ядерный процессор I7 с частотой 2,5 ГГц и 16 ГБ памяти, я получаю:

addprocs(3)
@time dynam_parallel(E,gm,gz,beta,Zprob)

> 164.237208 seconds (2.64 M allocations: 201.812 MB, 0.04% gc time)

Я делаю что-то неправильно здесь? Или есть лучший способ приблизиться к параллельной обработке в Юлии для этой конкретной проблемы? Я подумал об использовании распределенных массивов, но мне сложно понять, как применить их к данной проблеме.

ОБНОВЛЕНИЕ: За @DanGetz и его полезными комментариями я вместо этого попытался ускорить последовательную версию обработки. Я смог снизить производительность до 53.469780 seconds (67.36 M allocations: 103.419 GiB, 19.12% gc time) через:

1) Обновление до 0.6.0 (сэкономлено около 25 секунд), которое включает в себя полезные @views макро.

2) Предварительно выделив основной массив, который я пытаюсь заполнить (Tv), в разделе "Предварительное распределение результатов" в "Советы по производительности Julia": https://docs.julialang.org/en/latest/manual/performance-tips/. (сэкономил еще около 25 секунд)

Наибольшее замедление, по-видимому, происходит от add_vecs функция, которая суммирует подмассивы двух больших матриц. Я пробовал девекторизовать и использовать функции BLAS, но не смог добиться лучшей производительности.

В любом случае, улучшенный код для dynam_serial ниже:

function add_vecs(r::Array{Float32},h::Int,j::Int,E::Array{Float32},exp_v::Array{Float32},beta::Float32)

  @views r=E[:,h,j] + beta*exp_v[:,j]
  return r
end


function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
    v           = Array{Float32}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    Tv          = Array{Float32}(gm,gz)
    r           = Array{Float32}(gm)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'

      for h=1:gm
        for j=1:gz
          @views Tv[h,j]=findmax(add_vecs(r,h,j,E,exp_v,beta))[1]
        end
      end

      diff = maximum(abs,Tv - v)
      v=copy(Tv)
    end
    return Tv
end

2 ответа

Решение

Если add_vecs кажется критической функцией, написание явного for Цикл может предложить больше оптимизации. Как работает следующий бенчмарк:

function add_vecs!(r::Array{Float32},h::Int,j::Int,E::Array{Float32},
                  exp_v::Array{Float32},beta::Float32)
    @inbounds for i=1:size(E,1)
        r[i]=E[i,h,j] + beta*exp_v[i,j]
    end
    return r
end

ОБНОВИТЬ

Чтобы продолжить оптимизацию dynam_serial Я попытался удалить больше выделений. Результат:

function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
                           exp_v::Array{Float64},beta::Float64)
    @inbounds for i=1:gm            
        r[i] = E[i,h,j]+beta*exp_v[i,j]
    end
    return findmax(r)[1]
end

function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
                      beta::Float64,Zprob::Array{Float64})
    v           = Array{Float64}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    r           = Array{Float64}(gm)
    exp_v       = Array{Float64}(gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1.0 # arbitrary initial value greater than convcrit
    while diff>convcrit
        A_mul_Bt!(exp_v,v,Zprob)
        diff = -Inf
        for h=1:gm
            for j=1:gz
                oldv = v[h,j]
                newv = add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
                v[h,j]= newv
                diff = max(diff, oldv-newv, newv-oldv)
            end
        end
    end
    return v
end

Переключение функций на использование Float64 должно увеличить скорость (поскольку процессоры оптимизированы для работы с 64-битными длинами слов). Кроме того, используя мутацию A_mul_Bt! напрямую сохраняет другое распределение. Избегать copy(...) переключая массивы v а также Tv,

Как эти оптимизации улучшают ваше время выполнения?

2-е ОБНОВЛЕНИЕ

Обновлен код в разделе ОБНОВЛЕНИЕ для использования findmax, Также поменял dynam_serial использовать v без Tv, поскольку не было необходимости сохранять старую версию, кроме diff расчет, который теперь делается внутри цикла.

Вот код, который я скопировал и вставил, предоставленный Дэном Гетцем выше. Я включаю массивы и скалярные определения в точности так, как я их запустил. Производительность была: 39.507005 seconds (11 allocations: 486.891 KiB) при беге @time dynam_serial(E,gm,gz,beta,Zprob),

using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = (1+r)^(-1)
sigma_range = 4
gz = 19 #15 #19
gp = 29 #19 #29
gk = 37 #25 #37

lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp.(lnz)

gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))

#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float64,N,N)

Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]

zstep = (Z[N] - Z[1]) / (N - 1)

for i=2:(N-1)
    Z[i] = Z[1] + zstep * (i - 1)
end

for a = 1 : N
    for b = 1 : N
      if b == 1
          Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
      elseif b == N
          Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      else
          Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                       0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      end
    end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float64,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)


gm = gk * gp

control=zeros(gm,2)
for i=1:gk
    control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
    control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)

E=Array(Float64,gm,gm,gz)
for h=1:gm
   for m=1:gm
       for j=1:gz
         # set the nonzero net debt indicator
          if endog[h,2]<0
             p_ind=1

          else
             p_ind=0
          end

           # set the investment indicator
            if (control[m,1]-(1-delta)*endog[h,1])!=0
               i_ind=1
            else
               i_ind=0
            end

            E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                 (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                s*endog[h,2]*p_ind
            elem = E[m,h,j]
            if E[m,h,j]<0
                E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
            else
                E[m,h,j]=elem
            end
        end
    end
 end

function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
                           exp_v::Array{Float64},beta::Float64)
    maxr = -Inf
    @inbounds for i=1:gm            r[i] = E[i,h,j]+beta*exp_v[i,j]
        maxr = max(r[i],maxr)
    end
    return maxr
end

function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
                      beta::Float64,Zprob::Array{Float64})
    v           = Array{Float64}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    Tv          = Array{Float64}(gm,gz)
    r           = Array{Float64}(gm)
    exp_v       = Array{Float64}(gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1.0 # arbitrary initial value greater than convcrit
    while diff>convcrit
        A_mul_Bt!(exp_v,v,Zprob)
        diff = -Inf
        for h=1:gm
            for j=1:gz
                Tv[h,j]=add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
                diff = max(abs(Tv[h,j]-v[h,j]),diff)
            end
        end
        (v,Tv)=(Tv,v)
    end
    return v
end

Теперь, вот еще одна версия алгоритма и входных данных. Функции похожи на то, что предложил Дэн Гетц, за исключением того, что я использую findmax а не повторный max функция, чтобы найти максимум массива. В конструкции ввода я использую оба Float32 и смешивание разных типов битов вместе. Тем не менее, я последовательно добивался лучших результатов таким образом: 24.905569 seconds (1.81 k allocations: 46.829 MiB, 0.01% gc time), Но не совсем понятно почему.

using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = Float32((1+r)^(-1))
sigma_range = 4
gz = 19
gp = 29
gk = 37

lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp(lnz)

gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))

#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float32,N,N)

Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]

zstep = (Z[N] - Z[1]) / (N - 1)

for i=2:(N-1)
    Z[i] = Z[1] + zstep * (i - 1)
end

for a = 1 : N
    for b = 1 : N
      if b == 1
          Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
      elseif b == N
          Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      else
          Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                       0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      end
    end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float32,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)


gm = gk * gp

control=zeros(gm,2)
for i=1:gk
    control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
    control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)

E=Array(Float32,gm,gm,gz)
for h=1:gm
   for m=1:gm
       for j=1:gz
         # set the nonzero net debt indicator
          if endog[h,2]<0
             p_ind=1

          else
             p_ind=0
          end

           # set the investment indicator
            if (control[m,1]-(1-delta)*endog[h,1])!=0
               i_ind=1
            else
               i_ind=0
            end

            E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                 (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                s*endog[h,2]*p_ind
            elem = E[m,h,j]
            if E[m,h,j]<0
                E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
            else
                E[m,h,j]=elem
            end
        end
    end
 end

 function add_vecs!(gm::Int,r::Array{Float32},h::Int,j::Int,E::Array{Float32},
                   exp_v::Array{Float32},beta::Float32)

     @inbounds @views for i=1:gm
         r[i]=E[i,h,j] + beta*exp_v[i,j]
     end
     return r
 end

 function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
     v           = Array{Float32}(gm,gz)
     fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

     Tv          = Array{Float32}(gm,gz)

     # Set parameters for the loop
     convcrit = 0.0001   # chosen convergence criterion
     diff = 1.00000          # arbitrary initial value greater than convcrit
     iter=0
     exp_v=Array{Float32}(gm,gz)

     r=Array{Float32}(gm)

     while diff>convcrit


       A_mul_Bt!(exp_v,v,Zprob)

       for h=1:gm
         for j=1:gz
           Tv[h,j]=findmax(add_vecs!(gm,r,h,j,E,exp_v,beta))[1]
         end
       end
       diff = maximum(abs,Tv - v)
       (v,Tv)=(Tv,v)
     end
     return v
 end
Другие вопросы по тегам