doParallel (package) foreach не работает для больших итераций в R

Я запускаю следующий код (извлеченный из виньеток doParallel) на ПК (ОС Linux) с 4 и 8 физическими и логическими ядрами соответственно.

Выполнение кода с iter=1e+6 или меньше, все хорошо, и я могу видеть из загрузки процессора, что все ядра используются для этого вычисления. Однако с большим количеством итераций (например, iter=4e+6), кажется, параллельные вычисления не работают в этом случае. Когда я также отслеживаю использование ЦП, в вычислениях участвует только одно ядро ​​(100%).

Example1

require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]

У вас есть идеи, что может быть причиной? Может ли память быть причиной?

Я погуглил и обнаружил, что ЭТО относится к моему вопросу, но суть в том, что мне не дают никаких ошибок, и ОП, по-видимому, придумал решение, предоставив необходимые пакеты внутри foreach петля. Но в моем цикле, как видно, пакет не используется.

Update1

Моя проблема до сих пор не решена. Что касается моих экспериментов, я не думаю, что память может быть причиной. В моей системе 8 ГБ памяти, на которой я запускаю следующую простую параллельную (по всем 8 логическим ядрам) итерацию:

Example2

require("doParallel")
require("foreach")

registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        i
    }
})[3]

У меня нет проблем с запуском этого кода, но когда я наблюдаю за использованием процессора, только одно ядро ​​(из 8) составляет 100%.

UPDATE2

Что касается Example2, @SteveWeston (спасибо за указание на это) заявил (в комментариях): "Пример в вашем обновлении страдает от крошечных задач. Только мастер может выполнять какую-то реальную работу, которая состоит из отправки задач и обработки результаты. Это принципиально отличается от проблемы с исходным примером, который использовал несколько ядер на меньшем количестве итераций ".

Однако Пример 1 все еще остается нерешенным. Когда я запускаю его и наблюдаю за процессами с htopвот что происходит более подробно:

Назовем все 8 созданных процессов p1 через p8, Статус (столбец S в htop) за p1 является R Это означает, что он работает и остается неизменным. Однако для p2 вплоть до p8через несколько минут статус изменится на D (т.е. непрерывный сон) и через несколько минут снова меняется на Z (т.е. прекращено, но не получено его родителем). У вас есть идеи, почему это происходит?

2 ответа

Решение

Я думаю, что у вас мало памяти. Вот модифицированная версия этого примера, которая должна работать лучше, когда у вас много задач. Он использует doSNOW, а не doParallel, потому что doSNOW позволяет обрабатывать результаты с помощью функции объединения, когда они возвращаются рабочими. Этот пример записывает эти результаты в файл, чтобы использовать меньше памяти, однако в конце он считывает результаты обратно в память, используя функцию ".final", но вы можете пропустить это, если у вас недостаточно памяти.

library(doSNOW)
library(tcltk)
nw <- 4  # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)

x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000  # may require tuning for your machine
maxcomb <- nw + 1  # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)

comb <- function(fobj, ...) {
  for(r in list(...))
    writeBin(r, fobj)
  fobj
}

final <- function(fobj) {
  close(fobj)
  t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}

mkprogress <- function(total) {
  pb <- tkProgressBar(max=total,
                      label=sprintf('total tasks: %d', total))
  function(n, tag) {
    setTkProgressBar(pb, n,
      label=sprintf('last completed task: %d of %d', tag, total))
  }
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')

r <-
  foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
          .maxcombine=maxcomb, .init=resultFile, .final=final,
          .inorder=FALSE, .options.snow=opts) %dopar% {
    do.call('c', lapply(seq_len(n), function(i) {
      ind <- sample(100, 100, replace=TRUE)
      result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
      coefficients(result1)
    }))
  }

Я включил индикатор выполнения, поскольку выполнение этого примера занимает несколько часов.

Обратите внимание, что этот пример также использует idiv функция от iterators Пакет для увеличения объема работы в каждой из задач. Эта техника называется чанкингом и часто улучшает параллельную работу. Однако, используя idiv портит индексы задачи, так как переменная i теперь это индекс для каждой задачи, а не глобальный индекс. Для глобального индекса вы можете написать собственный итератор idiv:

idivix <- function(n, chunkSize) {
  i <- 1
  it <- idiv(n, chunkSize=chunkSize)
  nextEl <- function() {
    m <- nextElem(it)  # may throw 'StopIterator'
    value <- list(i=i, m=m)
    i <<- i + m
    value
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}

Значения, генерируемые этим итератором, являются списками, каждый из которых содержит начальный индекс и счетчик. Вот простой цикл foreach, который использует этот пользовательский итератор:

r <- 
  foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
    do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
      i
    }))
  }

Конечно, если задачи достаточно интенсивны, вам может не потребоваться разбивка на фрагменты, и вы можете использовать простой цикл foreach, как в исходном примере.

Сначала я подумал, что у вас проблемы с памятью, потому что отправка многих задач требует больше памяти, и это может в конечном итоге привести к зависанию основного процесса, поэтому в моем первоначальном ответе показано несколько методов использования меньшего количества памяти. Однако теперь звучит так, как будто есть фаза запуска и завершения, когда занят только главный процесс, но рабочие заняты в течение некоторого промежутка времени в середине. Я думаю, проблема в том, что задачи в этом примере на самом деле не требуют значительных вычислительных ресурсов, и поэтому, когда у вас много задач, вы начинаете замечать время запуска и завершения работы. Я рассчитал фактические вычисления и обнаружил, что каждая задача занимает всего около 3 миллисекунд. Раньше вы не получали никакой выгоды от параллельных вычислений с небольшими задачами, но теперь, в зависимости от вашей машины, вы можете получить некоторую выгоду, но накладные расходы значительны, поэтому, когда у вас очень много задач, вы действительно замечаете, что накладные расходы.

Я все еще думаю, что мой другой ответ хорошо работает для этой проблемы, но так как у вас достаточно памяти, это излишне. Наиболее важная техника, чтобы использовать чанкинг. Вот пример, который использует разделение на части с минимальными изменениями в исходном примере:

require("doParallel")
nw <- 8
registerDoParallel(nw)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
niter <- 4e+6
r <- foreach(n=idiv(niter, chunks=nw), .combine='rbind') %dopar% {
  do.call('rbind', lapply(seq_len(n), function(i) {
    ind <- sample(100, 100, replace=TRUE)
    result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
    coefficients(result1)
  }))
}

Обратите внимание, что это делает чанкинг немного иначе, чем мой другой ответ. Он использует только одну задачу на одного работника, используя idiv chunks вариант, а не chunkSize вариант. Это уменьшает объем работы, выполняемой мастером, и является хорошей стратегией, если у вас достаточно памяти.

Другие вопросы по тегам