Julia pmap speed - параллельная обработка - динамическое программирование

Я пытаюсь ускорить заполнение матрицы для задачи динамического программирования в Julia (v0.6.0), и я не могу получить дополнительную скорость от использования pmap, Это связано с вопросом, который я написал почти год назад: заполнение матрицы с помощью параллельной обработки в Юлии. Тогда я смог ускорить последовательную обработку с некоторой большой помощью, и теперь я пытаюсь получить дополнительную скорость от инструментов параллельной обработки в Джулии.

Для случая последовательной обработки я использовал 3-мерную матрицу (по сути, набор матриц одинакового размера, проиндексированных по 1-му измерению) и выполнял итерации по 1-му измерению. Я хотел дать pmap попытка, однако, более эффективно перебирать множество матриц.

Вот настройка кода. Использовать pmap с v_iter Приведенная ниже функция, я преобразовал трехмерную матрицу в объект словаря с ключами словаря, равными значениям индекса в 1-м измерении (v_dict в коде ниже, с gcc равен размеру 1-го измерения). v_iter функция принимает другие объекты словаря (E_opt_dict а также gridpoint_m_dict ниже) в качестве дополнительных входов:

function v_iter(a,b,c)
   diff_v = 1
   while diff_v>convcrit
     diff_v = -Inf

     #These lines efficiently multiply the value function by the Markov transition matrix, using the A_mul_B function
     exp_v       = zeros(Float64,gkpc,1)
     A_mul_B!(exp_v,a[1:gkpc,:],Zprob[1,:])
     for j=2:gz
       temp=Array{Float64}(gkpc,1)
       A_mul_B!(temp,a[(j-1)*gkpc+1:(j-1)*gkpc+gkpc,:],Zprob[j,:])
       exp_v=hcat(exp_v,temp)
     end    

     #This tries to find the optimal value of v
     for h=1:gm
       for j=1:gz
         oldv = a[h,j]
         newv = (1-tau)*b[h,j]+beta*exp_v[c[h,j],j]
         a[h,j] = newv
         diff_v = max(diff_v, oldv-newv, newv-oldv)
       end
     end
   end
end

gz =  9  
gp =  13  
gk =  17  
gcc =  5  
gm    = gk * gp * gcc * gz
gkpc  = gk * gp * gcc
gkp = gk*gp
beta  = ((1+0.015)^(-1))
tau        = 0.35
Zprob = [0.43 0.38 0.15 0.03 0.00 0.00 0.00 0.00 0.00; 0.05 0.47 0.35 0.11 0.02 0.00 0.00 0.00 0.00; 0.01 0.10 0.50 0.30 0.08 0.01 0.00 0.00 0.00; 0.00 0.02 0.15 0.51 0.26 0.06 0.01  0.00 0.00; 0.00 0.00 0.03 0.21 0.52 0.21 0.03 0.00 0.00 ; 0.00 0.00  0.01  0.06 0.26 0.51 0.15 0.02 0.00 ; 0.00 0.00 0.00 0.01 0.08 0.30 0.50 0.10 0.01 ; 0.00 0.00 0.00 0.00 0.02 0.11 0.35 0.47 0.05; 0.00 0.00 0.00 0.00 0.00 0.03 0.15 0.38 0.43]
convcrit = 0.001   # chosen convergence criterion

E_opt                  = Array{Float64}(gcc,gm,gz)    
fill!(E_opt,10.0)

gridpoint_m   = Array{Int64}(gcc,gm,gz)
fill!(gridpoint_m,fld(gkp,2)) 

v_dict=Dict(i => zeros(Float64,gm,gz) for i=1:gcc)
E_opt_dict=Dict(i => E_opt[i,:,:] for i=1:gcc)
gridpoint_m_dict=Dict(i => gridpoint_m[i,:,:] for i=1:gcc) 

Для параллельной обработки я выполнил следующие две команды:

wp = CachingPool(workers())
addprocs(3)
pmap(wp,v_iter,values(v_dict),values(E_opt_dict),values(gridpoint_m_dict))

... который произвел это представление:

135.626417 seconds (3.29 G allocations: 57.152 GiB, 3.74% gc time)

Затем я попытался выполнить последовательный процесс:

for i=1:gcc
    v_iter(v_dict[i],E_opt_dict[i],gridpoint_m_dict[i])
end

... и получил лучшую производительность.

128.263852 seconds (3.29 G allocations: 57.101 GiB, 4.53% gc time)

Это также дает мне примерно ту же производительность, что и бег v_iter на оригинальных трехмерных объектах:

v=zeros(Float64,gcc,gm,gz)
for i=1:gcc
    v_iter(v[i,:,:],E_opt[i,:,:],gridpoint_m[i,:,:])
end

Я знаю, что параллельная обработка требует времени установки, но когда я увеличиваю значение gccЯ все еще получаю примерно равное время обработки для последовательных и параллельных. Это кажется хорошим кандидатом для параллельной обработки, поскольку нет необходимости в обмене сообщениями между рабочими! Но я не могу заставить его работать эффективно.

2 ответа

Вы создаете CachingPool перед добавлением рабочих процессов. Следовательно, ваш пул кеширования перешел на pmap говорит ему использовать только одного работника. Вы можете просто проверить это, запустив wp.workers вы увидите что-то вроде Set([1]), Следовательно, это должно быть: addprocs(3) wp = CachingPool(workers()) Вы также можете рассмотреть запуск Джулия -p параметр командной строки, например julia -p 3 и тогда вы можете пропустить addprocs(3) команда.

Кроме того, ваш for а также pmap петли не эквивалентны. Юлия Dict Объект является хеш-картой и, как и другие языки, не предлагает ничего похожего на порядок элементов. Следовательно, в вашем цикле for вы гарантированно получите такое же соответствие i-й элемент в то время как с values порядок значений не обязательно должен соответствовать исходному порядку (и вы можете иметь различный порядок для каждой из этих трех переменных в pmap петля). Так как ключи для вашего Dicts просто цифры от 1 вплоть до gcc Вы должны просто использовать массивы вместо этого. Вы можете использовать генераторы, очень похожие на Python. Для примера вместо v_dict=Dict(i => zeros(Float64,gm,gz) for i=1:gcc) использование v_dict_a = [zeros(Float64,gm,gz) for i=1:gcc]

Надеюсь, это поможет.

Основываясь на полезных советах @Przemyslaw Szufeul, я поместил ниже код, который правильно выполняет параллельную обработку. Запустив его один раз, я добился существенного улучшения во время выполнения: 77.728264 seconds (181.20 k allocations: 12.548 MiB)

В дополнение к переупорядочению wp Команду и с помощью генератора рекомендовал Пшемыслав, я тоже переделал v_iter в качестве анонимной функции, чтобы избежать необходимости посыпать @everywhere вокруг кода для передачи функций и данных для работников.

Я также добавил return a к v_iter функция и установить v_a ниже равно выходу pmap, так как вы не можете передать по ссылке на удаленный объект.

addprocs(3)
v_iter = function(a,b,c)
   diff_v = 1
   while diff_v>convcrit
     diff_v = -Inf

     #These lines efficiently multiply the value function by the Markov transition matrix, using the A_mul_B function
     exp_v       = zeros(Float64,gkpc,1)
     A_mul_B!(exp_v,a[1:gkpc,:],Zprob[1,:])
     for j=2:gz
       temp=Array{Float64}(gkpc,1)
       A_mul_B!(temp,a[(j-1)*gkpc+1:(j-1)*gkpc+gkpc,:],Zprob[j,:])
       exp_v=hcat(exp_v,temp)
     end    

     #This tries to find the optimal value of v
     for h=1:gm
       for j=1:gz
         oldv = a[h,j]
         newv = (1-tau)*b[h,j]+beta*exp_v[c[h,j],j]
         a[h,j] = newv
         diff_v = max(diff_v, oldv-newv, newv-oldv)
       end
     end
   end
  return a
end

gz =  9  
gp =  13  
gk =  17  
gcc =  5  
gm    = gk * gp * gcc * gz
gkpc  = gk * gp * gcc
gkp   =gk*gp
beta  = ((1+0.015)^(-1))
tau        = 0.35
Zprob = [0.43 0.38 0.15 0.03 0.00 0.00 0.00 0.00 0.00; 0.05 0.47 0.35 0.11 0.02 0.00 0.00 0.00 0.00; 0.01 0.10 0.50 0.30 0.08 0.01 0.00 0.00 0.00; 0.00 0.02 0.15 0.51 0.26 0.06 0.01  0.00 0.00; 0.00 0.00 0.03 0.21 0.52 0.21 0.03 0.00 0.00 ; 0.00 0.00  0.01  0.06 0.26 0.51 0.15 0.02 0.00 ; 0.00 0.00 0.00 0.01 0.08 0.30 0.50 0.10 0.01 ; 0.00 0.00 0.00 0.00 0.02 0.11 0.35 0.47 0.05; 0.00 0.00 0.00 0.00 0.00 0.03 0.15 0.38 0.43]
convcrit = 0.001   # chosen convergence criterion

E_opt                  = Array{Float64}(gcc,gm,gz)    
fill!(E_opt,10.0)

gridpoint_m   = Array{Int64}(gcc,gm,gz)
fill!(gridpoint_m,fld(gkp,2)) 

v_a=[zeros(Float64,gm,gz) for i=1:gcc]
E_opt_a=[E_opt[i,:,:] for i=1:gcc]
gridpoint_m_a=[gridpoint_m[i,:,:] for i=1:gcc]

wp = CachingPool(workers())
v_a = pmap(wp,v_iter,v_a,E_opt_a,gridpoint_m_a)
Другие вопросы по тегам