Есть ли способ отслеживать прогресс на mclapply?
Я люблю обстановку .progress = 'text'
в plyr's
llply
, Тем не менее, это вызывает у меня большое беспокойство, чтобы не знать, как далеко mclapply
(из пакета multicore
), поскольку элементы списка отправляются различным ядрам, а затем в конце сопоставляются.
Я выводил сообщения как *currently in sim_id # ....*
но это не очень полезно, потому что это не дает мне индикатор того, какой процент элементов списка завершен (хотя полезно знать, что мой сценарий не застрял и не продвигается).
Может кто-то предложить другие идеи, которые позволят мне взглянуть на мой .Rout
файл и получить чувство прогресса? Я думал о добавлении ручного счетчика, но не вижу, как бы я это реализовал, так как mclapply
должен завершить обработку всех элементов списка, прежде чем он сможет дать какие-либо отзывы.
5 ответов
Благодаря тому факту, что mclapply
порождает несколько процессов, возможно, вы захотите использовать fifo, pipe или даже сокеты. Теперь рассмотрим следующий пример:
library(multicore)
finalResult <- local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0.0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
cat(sprintf("Progress: %.2f%%\n", progress * 100))
}
exit()
}
numJobs <- 100
result <- mclapply(1:numJobs, function(...) {
# Dome something fancy here
# ...
# Send some progress update
writeBin(1/numJobs, f)
# Some arbitrary result
sample(1000, 1)
})
close(f)
result
})
cat("Done\n")
Здесь временный файл используется как fifo, и основной процесс разветвляет ребенка, единственная обязанность которого - сообщать о текущем прогрессе. Основной процесс продолжается по телефону mclapply
где выражение (точнее, блок выражения), которое должно быть оценено, записывает частичную информацию о ходе выполнения в буфер fifo посредством writeBin
,
Поскольку это всего лишь простой пример, вам, вероятно, придется адаптировать весь материал вывода к вашим потребностям. НТН!
По сути, добавление еще одной версии решения @fotNelson, но с некоторыми изменениями:
- Отмена замены для mclapply (поддерживает все функции mclapply)
- Ловит Ctrl-C звонки и грациозно прерывает
- использует встроенный индикатор выполнения (txtProgressBar)
- возможность отслеживать прогресс или нет и использовать указанный стиль индикатора выполнения
- использования
parallel
скорее, чемmulticore
который теперь был удален из CRAN - Приводит X к списку согласно mclapply (поэтому length(X) дает ожидаемые результаты)
- Документация стиля roxygen2 наверху
Надеюсь, это поможет кому-то...
library(parallel)
#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#'
#' Based on http://stackru.com/questions/10984556
#'
#' @param X a vector (atomic or list) or an expressions vector. Other
#' objects (including classed objects) will be coerced by
#' ‘as.list’
#' @param FUN the function to be applied to
#' @param ... optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#'
#' dat <- lapply(1:10, function(x) rnorm(100))
#' func <- function(x, arg1) mean(x)/arg1
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ...,
mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
mc.progress=TRUE, mc.style=3)
{
if (!is.vector(X) || is.object(X)) X <- as.list(X)
if (mc.progress) {
f <- fifo(tempfile(), open="w+b", blocking=T)
p <- parallel:::mcfork()
pb <- txtProgressBar(0, length(X), style=mc.style)
setTxtProgressBar(pb, 0)
progress <- 0
if (inherits(p, "masterProcess")) {
while (progress < length(X)) {
readBin(f, "double")
progress <- progress + 1
setTxtProgressBar(pb, progress)
}
cat("\n")
parallel:::mcexit()
}
}
tryCatch({
result <- mclapply(X, ..., function(...) {
res <- FUN(...)
if (mc.progress) writeBin(1, f)
res
},
mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
mc.silent = mc.silent, mc.cores = mc.cores,
mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
)
}, finally = {
if (mc.progress) close(f)
})
result
}
pbapply
Пакет реализовал это для общего случая. И то и другое pblapply
а также pbsapply
иметь cl
аргумент. Из документации:
Параллельная обработка может быть включена через
cl
аргумент.parLapply
называется когдаcl
это 'cluster
'объект,mclapply
называется когдаcl
является целым числом Отображение индикатора выполнения увеличивает накладные расходы на связь между основным процессом и узлами / дочерними процессами по сравнению с параллельными эквивалентами функций без индикатора выполнения. Функции возвращаются к своим первоначальным эквивалентам, когда индикатор выполнения отключен (т.е.getOption("pboptions")$type == "none"
dopb()
являетсяFALSE
). Это по умолчанию, когдаinteractive()
еслиFALSE
(то есть вызывается из командной строки R скрипт).
Если один не поставляет cl
(или проходит NULL
) непараллельный по умолчанию lapply
используется, включая индикатор выполнения.
Вот функция, основанная на решении @fotNelton и применяемая везде, где вы обычно используете mcapply.
mcadply <- function(X, FUN, ...) {
# Runs multicore lapply with progress indicator and transformation to
# data.table output. Arguments mirror those passed to lapply.
#
# Args:
# X: Vector.
# FUN: Function to apply to each value of X. Note this is transformed to
# a data.frame return if necessary.
# ...: Other arguments passed to mclapply.
#
# Returns:
# data.table stack of each mclapply return value
#
# Progress bar code based on https://stackru.com/a/10993589
require(multicore)
require(plyr)
require(data.table)
local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0
print.progress <- 0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
# Print every 1%
if(progress >= print.progress + 0.01) {
cat(sprintf("Progress: %.0f%%\n", progress * 100))
print.progress <- floor(progress * 100) / 100
}
}
exit()
}
newFun <- function(...) {
writeBin(1 / length(X), f)
return(as.data.frame(FUN(...)))
}
result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
close(f)
cat("Done\n")
return(result)
})
}
Вы можете использовать эхо-функцию вашей системы для записи от ваших рабочих, поэтому просто добавьте следующую строку в вашу функцию:
myfun <- function(x){
if(x %% 5 == 0) system(paste("echo 'now processing:",x,"'"))
dosomething(mydata[x])
}
result <- mclapply(1:10,myfun,mc.cores=5)
> now processing: 5
> now processing: 10
Это будет работать, если вы передадите индекс, например, поэтому вместо того, чтобы передавать список данных, передайте индекс и извлеките данные в рабочей функции.
Основываясь на ответе @fotNelson, используйте индикатор выполнения вместо построчной печати и вызовите внешнюю функцию с помощью mclapply.
library('utils')
library('multicore')
prog.indic <- local({ #evaluates in local environment only
f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
assign(x='f',value=f,envir=.GlobalEnv)
pb <- txtProgressBar(min=1, max=MC,style=3)
if (inherits(fork(), "masterProcess")) { #progress tracker
# Child
progress <- 0.0
while (progress < MC && !isIncomplete(f)){
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
# Updating the progress bar.
setTxtProgressBar(pb,progress)
}
exit()
}
MC <- 100
result <- mclapply(1:MC, .mcfunc)
cat('\n')
assign(x='result',value=result,envir=.GlobalEnv)
close(f)
})
.mcfunc<-function(i,...){
writeBin(1, f)
return(i)
}
Назначение подключения fifo.GlobalEnv необходимо для использования его из функции вне вызова mclapply. Спасибо за вопросы и предыдущие ответы, я долго думал, как это сделать.