Есть ли способ отслеживать прогресс на mclapply?

Я люблю обстановку .progress = 'text' в plyr'sllply, Тем не менее, это вызывает у меня большое беспокойство, чтобы не знать, как далеко mclapply (из пакета multicore), поскольку элементы списка отправляются различным ядрам, а затем в конце сопоставляются.

Я выводил сообщения как *currently in sim_id # ....* но это не очень полезно, потому что это не дает мне индикатор того, какой процент элементов списка завершен (хотя полезно знать, что мой сценарий не застрял и не продвигается).

Может кто-то предложить другие идеи, которые позволят мне взглянуть на мой .Rout файл и получить чувство прогресса? Я думал о добавлении ручного счетчика, но не вижу, как бы я это реализовал, так как mclapply должен завершить обработку всех элементов списка, прежде чем он сможет дать какие-либо отзывы.

5 ответов

Решение

Благодаря тому факту, что mclapply порождает несколько процессов, возможно, вы захотите использовать fifo, pipe или даже сокеты. Теперь рассмотрим следующий пример:

library(multicore)

finalResult <- local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
        # Child
        progress <- 0.0
        while (progress < 1 && !isIncomplete(f)) {
            msg <- readBin(f, "double")
            progress <- progress + as.numeric(msg)
            cat(sprintf("Progress: %.2f%%\n", progress * 100))
        } 
        exit()
    }
    numJobs <- 100
    result <- mclapply(1:numJobs, function(...) {
        # Dome something fancy here
        # ...
        # Send some progress update
        writeBin(1/numJobs, f)
        # Some arbitrary result
        sample(1000, 1)
    })
    close(f)
    result
})

cat("Done\n")

Здесь временный файл используется как fifo, и основной процесс разветвляет ребенка, единственная обязанность которого - сообщать о текущем прогрессе. Основной процесс продолжается по телефону mclapply где выражение (точнее, блок выражения), которое должно быть оценено, записывает частичную информацию о ходе выполнения в буфер fifo посредством writeBin,

Поскольку это всего лишь простой пример, вам, вероятно, придется адаптировать весь материал вывода к вашим потребностям. НТН!

По сути, добавление еще одной версии решения @fotNelson, но с некоторыми изменениями:

  • Отмена замены для mclapply (поддерживает все функции mclapply)
  • Ловит Ctrl-C звонки и грациозно прерывает
  • использует встроенный индикатор выполнения (txtProgressBar)
  • возможность отслеживать прогресс или нет и использовать указанный стиль индикатора выполнения
  • использования parallel скорее, чем multicore который теперь был удален из CRAN
  • Приводит X к списку согласно mclapply (поэтому length(X) дает ожидаемые результаты)
  • Документация стиля roxygen2 наверху

Надеюсь, это поможет кому-то...

library(parallel)

#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#' 
#' Based on http://stackru.com/questions/10984556
#' 
#' @param X         a vector (atomic or list) or an expressions vector. Other
#'                  objects (including classed objects) will be coerced by
#'                  ‘as.list’
#' @param FUN       the function to be applied to
#' @param ...       optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style    style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#' 
#' dat <- lapply(1:10, function(x) rnorm(100)) 
#' func <- function(x, arg1) mean(x)/arg1 
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ..., 
    mc.preschedule = TRUE, mc.set.seed = TRUE,
    mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
    mc.cleanup = TRUE, mc.allow.recursive = TRUE,
    mc.progress=TRUE, mc.style=3) 
{
    if (!is.vector(X) || is.object(X)) X <- as.list(X)

    if (mc.progress) {
        f <- fifo(tempfile(), open="w+b", blocking=T)
        p <- parallel:::mcfork()
        pb <- txtProgressBar(0, length(X), style=mc.style)
        setTxtProgressBar(pb, 0) 
        progress <- 0
        if (inherits(p, "masterProcess")) {
            while (progress < length(X)) {
                readBin(f, "double")
                progress <- progress + 1
                setTxtProgressBar(pb, progress) 
            }
            cat("\n")
            parallel:::mcexit()
        }
    }
    tryCatch({
        result <- mclapply(X, ..., function(...) {
                res <- FUN(...)
                if (mc.progress) writeBin(1, f)
                res
            }, 
            mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
            mc.silent = mc.silent, mc.cores = mc.cores,
            mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
        )

    }, finally = {
        if (mc.progress) close(f)
    })
    result
}

pbapply Пакет реализовал это для общего случая. И то и другое pblapply а также pbsapply иметь cl аргумент. Из документации:

Параллельная обработка может быть включена через cl аргумент. parLapply называется когда cl это 'cluster'объект, mclapply называется когда cl является целым числом Отображение индикатора выполнения увеличивает накладные расходы на связь между основным процессом и узлами / дочерними процессами по сравнению с параллельными эквивалентами функций без индикатора выполнения. Функции возвращаются к своим первоначальным эквивалентам, когда индикатор выполнения отключен (т.е. getOption("pboptions")$type == "none"dopb() является FALSE). Это по умолчанию, когда interactive() если FALSE (то есть вызывается из командной строки R скрипт).

Если один не поставляет cl (или проходит NULL) непараллельный по умолчанию lapply используется, включая индикатор выполнения.

Вот функция, основанная на решении @fotNelton и применяемая везде, где вы обычно используете mcapply.

mcadply <- function(X, FUN, ...) {
  # Runs multicore lapply with progress indicator and transformation to
  # data.table output. Arguments mirror those passed to lapply.
  #
  # Args:
  # X:   Vector.
  # FUN: Function to apply to each value of X. Note this is transformed to 
  #      a data.frame return if necessary.
  # ...: Other arguments passed to mclapply.
  #
  # Returns:
  #   data.table stack of each mclapply return value
  #
  # Progress bar code based on https://stackru.com/a/10993589
  require(multicore)
  require(plyr)
  require(data.table)

  local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
      # Child
      progress <- 0
      print.progress <- 0
      while (progress < 1 && !isIncomplete(f)) {
        msg <- readBin(f, "double")
        progress <- progress + as.numeric(msg)
        # Print every 1%
        if(progress >= print.progress + 0.01) {
          cat(sprintf("Progress: %.0f%%\n", progress * 100))
          print.progress <- floor(progress * 100) / 100
        }
      }
      exit()
    }

    newFun <- function(...) {
      writeBin(1 / length(X), f)
      return(as.data.frame(FUN(...)))
    }

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
    close(f)
    cat("Done\n")
    return(result)
  })
}

Вы можете использовать эхо-функцию вашей системы для записи от ваших рабочих, поэтому просто добавьте следующую строку в вашу функцию:

myfun <- function(x){
if(x %% 5 == 0) system(paste("echo 'now processing:",x,"'"))
dosomething(mydata[x])
}

result <- mclapply(1:10,myfun,mc.cores=5)
> now processing: 5 
> now processing: 10 

Это будет работать, если вы передадите индекс, например, поэтому вместо того, чтобы передавать список данных, передайте индекс и извлеките данные в рабочей функции.

Основываясь на ответе @fotNelson, используйте индикатор выполнения вместо построчной печати и вызовите внешнюю функцию с помощью mclapply.

library('utils')
library('multicore')

prog.indic <- local({ #evaluates in local environment only
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
    assign(x='f',value=f,envir=.GlobalEnv)
    pb <- txtProgressBar(min=1, max=MC,style=3)

    if (inherits(fork(), "masterProcess")) { #progress tracker
        # Child
        progress <- 0.0
        while (progress < MC && !isIncomplete(f)){ 
            msg <- readBin(f, "double")
                progress <- progress + as.numeric(msg)

            # Updating the progress bar.
            setTxtProgressBar(pb,progress)
            } 


        exit()
        }
   MC <- 100
   result <- mclapply(1:MC, .mcfunc)

    cat('\n')
    assign(x='result',value=result,envir=.GlobalEnv)
    close(f)
    })

.mcfunc<-function(i,...){
        writeBin(1, f)
        return(i)
    }

Назначение подключения fifo.GlobalEnv необходимо для использования его из функции вне вызова mclapply. Спасибо за вопросы и предыдущие ответы, я долго думал, как это сделать.

Другие вопросы по тегам