Почему бы не балансировать нагрузку при параллельных вычислениях с использованием снегопада?
В течение долгого времени я использовал sfLapply для многих моих параллельных скриптов. Однако, недавно, когда я углубился в параллельные вычисления, я использовал sfClusterApplyLB, который может сэкономить много времени, если отдельным экземплярам не требуется одинаковое количество времени для запуска. Если sfLapply будет ожидать завершения каждого экземпляра пакета, прежде чем загружать новый пакет (что может привести к бездействующим экземплярам), то есть экземпляры sfClusterApplyLB, которые завершат свою задачу, будут немедленно назначены оставшимся элементам в списке, что потенциально может сэкономить немного времени, когда экземпляры не занимают точно такое же количество времени. Это привело меня к вопросу, почему мы не хотим балансировать нагрузку при использовании снегопада? Единственное, что я обнаружил до сих пор, это то, что при возникновении ошибки в параллельном скрипте sfClusterApplyLB будет по-прежнему циклически проходить по всему списку, прежде чем выдавать ошибку, в то время как sfLapply остановится после попытки выполнить первый пакет. Что еще мне не хватает? Есть ли другие затраты / недостатки балансировки нагрузки? Ниже приведен пример кода, который показывает разницу между двумя
rm(list = ls()) #remove all past worksheet variables
working_dir="D:/temp/"
setwd(working_dir)
n_spp=16
spp_nmS=paste0("sp_",c(1:n_spp))
spp_nm=spp_nmS[1]
sp_parallel_run=function(sp_nm){
sink(file(paste0(working_dir,sp_nm,"_log.txt"), open="wt"))#######NEW
cat('\n', 'Started on ', date(), '\n')
ptm0 <- proc.time()
jnk=round(runif(1)*8000000) #this is just a redundant script that takes an arbitrary amount of time to run
jnk1=runif(jnk)
for (i in 1:length(jnk1)){
jnk1[i]=jnk[i]*runif(1)
}
ptm1=proc.time() - ptm0
jnk=as.numeric(ptm1[3])
cat('\n','It took ', jnk, "seconds to model", sp_nm)
#stop sinks
sink.reset <- function(){
for(i in seq_len(sink.number())){
sink(NULL)
}
}
sink.reset()
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time((sfLapply(spp_nmS,fun=sp_parallel_run)))
sfRemoveAll()
sfStop()
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time(sfClusterApplyLB(spp_nmS,fun=sp_parallel_run))
sfRemoveAll()
sfStop()
1 ответ
sfLapply
Функция полезна, потому что она разбивает входные значения на одну группу задач для каждого доступного работника, что является mclapply
функция вызывает предварительное планирование. Это может дать гораздо лучшую производительность, чем sfClusterApplyLB
когда задачи не занимают много времени.
Вот крайний пример, который демонстрирует преимущества предварительного планирования:
> system.time(sfLapply(1:100000, sqrt))
user system elapsed
0.148 0.004 0.170
> system.time(sfClusterApplyLB(1:100000, sqrt))
user system elapsed
19.317 1.852 21.222