R: асинхронное параллельное перекрытие
Самый простой способ, который я нашел до сих пор, использовать параллельный lapply
в R был через следующий пример кода:
library(parallel)
library(pbapply)
cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})
results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
У этого есть очень полезная функция обеспечения индикатора выполнения для результатов, и очень легко повторно использовать тот же код, когда не требуются параллельные вычисления, установив cl = NULL
.
Однако я заметил одну проблему: pblapply
выполняет цикл по списку партиями. Например, если один работник надолго застрял в определенной задаче, оставшиеся будут ждать ее завершения, прежде чем запускать новый пакет заданий. Для некоторых задач это добавляет много ненужного времени в рабочий процесс.
Мой вопрос: существуют ли аналогичные параллельные фреймворки, которые позволили бы рабочим работать независимо? Индикатор выполнения и возможность повторного использования кода сcl=NULL
будет большим плюсом.
Возможно, можно изменить существующий код pbapply
добавить эту опцию / функцию?
2 ответа
(Отказ от ответственности: я являюсь автором будущего фреймворка и пакета progressr)
Близкое решение, напоминающее base::lapply()
, и ваш pbapply::pblapply()
Например, можно использовать future.apply как:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
Разделение на части: вы можете контролировать количество фрагментов с помощью аргумента.future.chunk.size
или дополнительные future.schedule
. Чтобы отключить разбиение на части, чтобы каждый элемент обрабатывался в уникальной параллельной задаче, используйтеfuture.chunk.size=1
. Таким образом, если есть один элемент, который занимает намного больше времени, чем другие элементы, он не будет удерживать никакие другие элементы.
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
Обновления хода выполнения параллельно: если вы хотите получать обновления хода выполнения при параллельной обработке, вы можете использовать пакет progressr и настроить его на использование пакета прогресса для сообщения об обновлениях в качестве индикатора выполнения (здесь также с ETA).
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
Вы можете обернуть это в функцию, например
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
Таким образом, вы можете вызвать его как обычную функцию:
> result <- my_fcn(xs)
и использовать plan()
чтобы точно контролировать, как вы хотите распараллеливать. Это не будет сообщать о прогрессе. Для этого вам необходимо:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
Запускать все в фоновом режиме: Если ваш вопрос заключался в том, как запустить весь shebang в фоновом режиме, см. Виньетку " Топологии будущего ". Это еще один уровень распараллеливания, но он возможен.
Вы можете использовать furrr
пакет, который использует future
бежать purrr
в многопроцессорном режиме:
library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ────────────────────────────── 100%
Вы можете отключить параллельные вычисления с помощью plan(sequential)