doParallel (package) foreach не работает для больших итераций в R
Я запускаю следующий код (извлеченный из виньеток doParallel) на ПК (ОС Linux) с 4 и 8 физическими и логическими ядрами соответственно.
Выполнение кода с iter=1e+6
или меньше, все хорошо, и я могу видеть из загрузки процессора, что все ядра используются для этого вычисления. Однако с большим количеством итераций (например, iter=4e+6
), кажется, параллельные вычисления не работают в этом случае. Когда я также отслеживаю использование ЦП, в вычислениях участвует только одно ядро (100%).
Example1
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})[3]
У вас есть идеи, что может быть причиной? Может ли память быть причиной?
Я погуглил и обнаружил, что ЭТО относится к моему вопросу, но суть в том, что мне не дают никаких ошибок, и ОП, по-видимому, придумал решение, предоставив необходимые пакеты внутри foreach
петля. Но в моем цикле, как видно, пакет не используется.
Update1
Моя проблема до сих пор не решена. Что касается моих экспериментов, я не думаю, что память может быть причиной. В моей системе 8 ГБ памяти, на которой я запускаю следующую простую параллельную (по всем 8 логическим ядрам) итерацию:
Example2
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
i
}
})[3]
У меня нет проблем с запуском этого кода, но когда я наблюдаю за использованием процессора, только одно ядро (из 8) составляет 100%.
UPDATE2
Что касается Example2, @SteveWeston (спасибо за указание на это) заявил (в комментариях): "Пример в вашем обновлении страдает от крошечных задач. Только мастер может выполнять какую-то реальную работу, которая состоит из отправки задач и обработки результаты. Это принципиально отличается от проблемы с исходным примером, который использовал несколько ядер на меньшем количестве итераций ".
Однако Пример 1 все еще остается нерешенным. Когда я запускаю его и наблюдаю за процессами с htop
вот что происходит более подробно:
Назовем все 8 созданных процессов p1
через p8
, Статус (столбец S
в htop
) за p1
является R
Это означает, что он работает и остается неизменным. Однако для p2
вплоть до p8
через несколько минут статус изменится на D
(т.е. непрерывный сон) и через несколько минут снова меняется на Z
(т.е. прекращено, но не получено его родителем). У вас есть идеи, почему это происходит?
2 ответа
Я думаю, что у вас мало памяти. Вот модифицированная версия этого примера, которая должна работать лучше, когда у вас много задач. Он использует doSNOW, а не doParallel, потому что doSNOW позволяет обрабатывать результаты с помощью функции объединения, когда они возвращаются рабочими. Этот пример записывает эти результаты в файл, чтобы использовать меньше памяти, однако в конце он считывает результаты обратно в память, используя функцию ".final", но вы можете пропустить это, если у вас недостаточно памяти.
library(doSNOW)
library(tcltk)
nw <- 4 # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)
x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000 # may require tuning for your machine
maxcomb <- nw + 1 # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)
comb <- function(fobj, ...) {
for(r in list(...))
writeBin(r, fobj)
fobj
}
final <- function(fobj) {
close(fobj)
t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}
mkprogress <- function(total) {
pb <- tkProgressBar(max=total,
label=sprintf('total tasks: %d', total))
function(n, tag) {
setTkProgressBar(pb, n,
label=sprintf('last completed task: %d of %d', tag, total))
}
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')
r <-
foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
.maxcombine=maxcomb, .init=resultFile, .final=final,
.inorder=FALSE, .options.snow=opts) %dopar% {
do.call('c', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
Я включил индикатор выполнения, поскольку выполнение этого примера занимает несколько часов.
Обратите внимание, что этот пример также использует idiv
функция от iterators
Пакет для увеличения объема работы в каждой из задач. Эта техника называется чанкингом и часто улучшает параллельную работу. Однако, используя idiv
портит индексы задачи, так как переменная i
теперь это индекс для каждой задачи, а не глобальный индекс. Для глобального индекса вы можете написать собственный итератор idiv
:
idivix <- function(n, chunkSize) {
i <- 1
it <- idiv(n, chunkSize=chunkSize)
nextEl <- function() {
m <- nextElem(it) # may throw 'StopIterator'
value <- list(i=i, m=m)
i <<- i + m
value
}
obj <- list(nextElem=nextEl)
class(obj) <- c('abstractiter', 'iter')
obj
}
Значения, генерируемые этим итератором, являются списками, каждый из которых содержит начальный индекс и счетчик. Вот простой цикл foreach, который использует этот пользовательский итератор:
r <-
foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
i
}))
}
Конечно, если задачи достаточно интенсивны, вам может не потребоваться разбивка на фрагменты, и вы можете использовать простой цикл foreach, как в исходном примере.
Сначала я подумал, что у вас проблемы с памятью, потому что отправка многих задач требует больше памяти, и это может в конечном итоге привести к зависанию основного процесса, поэтому в моем первоначальном ответе показано несколько методов использования меньшего количества памяти. Однако теперь звучит так, как будто есть фаза запуска и завершения, когда занят только главный процесс, но рабочие заняты в течение некоторого промежутка времени в середине. Я думаю, проблема в том, что задачи в этом примере на самом деле не требуют значительных вычислительных ресурсов, и поэтому, когда у вас много задач, вы начинаете замечать время запуска и завершения работы. Я рассчитал фактические вычисления и обнаружил, что каждая задача занимает всего около 3 миллисекунд. Раньше вы не получали никакой выгоды от параллельных вычислений с небольшими задачами, но теперь, в зависимости от вашей машины, вы можете получить некоторую выгоду, но накладные расходы значительны, поэтому, когда у вас очень много задач, вы действительно замечаете, что накладные расходы.
Я все еще думаю, что мой другой ответ хорошо работает для этой проблемы, но так как у вас достаточно памяти, это излишне. Наиболее важная техника, чтобы использовать чанкинг. Вот пример, который использует разделение на части с минимальными изменениями в исходном примере:
require("doParallel")
nw <- 8
registerDoParallel(nw)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
niter <- 4e+6
r <- foreach(n=idiv(niter, chunks=nw), .combine='rbind') %dopar% {
do.call('rbind', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
Обратите внимание, что это делает чанкинг немного иначе, чем мой другой ответ. Он использует только одну задачу на одного работника, используя idiv chunks
вариант, а не chunkSize
вариант. Это уменьшает объем работы, выполняемой мастером, и является хорошей стратегией, если у вас достаточно памяти.