Почему бы не балансировать нагрузку при параллельных вычислениях с использованием снегопада?

В течение долгого времени я использовал sfLapply для многих моих параллельных скриптов. Однако, недавно, когда я углубился в параллельные вычисления, я использовал sfClusterApplyLB, который может сэкономить много времени, если отдельным экземплярам не требуется одинаковое количество времени для запуска. Если sfLapply будет ожидать завершения каждого экземпляра пакета, прежде чем загружать новый пакет (что может привести к бездействующим экземплярам), то есть экземпляры sfClusterApplyLB, которые завершат свою задачу, будут немедленно назначены оставшимся элементам в списке, что потенциально может сэкономить немного времени, когда экземпляры не занимают точно такое же количество времени. Это привело меня к вопросу, почему мы не хотим балансировать нагрузку при использовании снегопада? Единственное, что я обнаружил до сих пор, это то, что при возникновении ошибки в параллельном скрипте sfClusterApplyLB будет по-прежнему циклически проходить по всему списку, прежде чем выдавать ошибку, в то время как sfLapply остановится после попытки выполнить первый пакет. Что еще мне не хватает? Есть ли другие затраты / недостатки балансировки нагрузки? Ниже приведен пример кода, который показывает разницу между двумя

rm(list = ls()) #remove all past worksheet variables
working_dir="D:/temp/"
setwd(working_dir)
n_spp=16
spp_nmS=paste0("sp_",c(1:n_spp))
spp_nm=spp_nmS[1]
sp_parallel_run=function(sp_nm){
  sink(file(paste0(working_dir,sp_nm,"_log.txt"), open="wt"))#######NEW
  cat('\n', 'Started on ', date(), '\n') 
  ptm0 <- proc.time()
  jnk=round(runif(1)*8000000) #this is just a redundant script that takes an arbitrary amount of time to run
  jnk1=runif(jnk)
  for (i in 1:length(jnk1)){
    jnk1[i]=jnk[i]*runif(1)
  }
  ptm1=proc.time() - ptm0
  jnk=as.numeric(ptm1[3])
  cat('\n','It took ', jnk, "seconds to model", sp_nm)

  #stop sinks
  sink.reset <- function(){
    for(i in seq_len(sink.number())){
      sink(NULL)
    }
  }
  sink.reset()
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))

sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time((sfLapply(spp_nmS,fun=sp_parallel_run)))
sfRemoveAll()
sfStop()

sfInit( parallel=T, cpus=cpucores) # 
sfExportAll() 
system.time(sfClusterApplyLB(spp_nmS,fun=sp_parallel_run)) 
sfRemoveAll()
sfStop()

1 ответ

Решение

sfLapply Функция полезна, потому что она разбивает входные значения на одну группу задач для каждого доступного работника, что является mclapply функция вызывает предварительное планирование. Это может дать гораздо лучшую производительность, чем sfClusterApplyLB когда задачи не занимают много времени.

Вот крайний пример, который демонстрирует преимущества предварительного планирования:

> system.time(sfLapply(1:100000, sqrt))
   user  system elapsed
  0.148   0.004   0.170
> system.time(sfClusterApplyLB(1:100000, sqrt))
   user  system elapsed
 19.317   1.852  21.222
Другие вопросы по тегам