R: асинхронное параллельное перекрытие

Самый простой способ, который я нашел до сих пор, использовать параллельный lapply в R был через следующий пример кода:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)

У этого есть очень полезная функция обеспечения индикатора выполнения для результатов, и очень легко повторно использовать тот же код, когда не требуются параллельные вычисления, установив cl = NULL.

Однако я заметил одну проблему: pblapplyвыполняет цикл по списку партиями. Например, если один работник надолго застрял в определенной задаче, оставшиеся будут ждать ее завершения, прежде чем запускать новый пакет заданий. Для некоторых задач это добавляет много ненужного времени в рабочий процесс.

Мой вопрос: существуют ли аналогичные параллельные фреймворки, которые позволили бы рабочим работать независимо? Индикатор выполнения и возможность повторного использования кода сcl=NULL будет большим плюсом.

Возможно, можно изменить существующий код pbapply добавить эту опцию / функцию?

2 ответа

Решение

(Отказ от ответственности: я являюсь автором будущего фреймворка и пакета progressr)

Близкое решение, напоминающее base::lapply(), и ваш pbapply::pblapply()Например, можно использовать future.apply как:

library(future.apply)

## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
})

Разделение на части: вы можете контролировать количество фрагментов с помощью аргумента.future.chunk.size или дополнительные future.schedule. Чтобы отключить разбиение на части, чтобы каждый элемент обрабатывался в уникальной параллельной задаче, используйтеfuture.chunk.size=1. Таким образом, если есть один элемент, который занимает намного больше времени, чем другие элементы, он не будет удерживать никакие другие элементы.

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
}, future.chunk.size=1)

Обновления хода выполнения параллельно: если вы хотите получать обновления хода выполнения при параллельной обработке, вы можете использовать пакет progressr и настроить его на использование пакета прогресса для сообщения об обновлениях в качестве индикатора выполнения (здесь также с ETA).

library(future.apply)
plan(multisession, workers=4)

library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))

with_progress({
  p <- progressor(along=xs)
  results <- future_lapply(xs, FUN=function(x) {
    p()  ## signal progress
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
})

Вы можете обернуть это в функцию, например

my_fcn <- function(xs) {
  p <- progressor(along=xs)
  future_lapply(xs, FUN=function(x) {
    p()
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
}

Таким образом, вы можете вызвать его как обычную функцию:

> result <- my_fcn(xs)

и использовать plan()чтобы точно контролировать, как вы хотите распараллеливать. Это не будет сообщать о прогрессе. Для этого вам необходимо:

> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------]   9%  1m

Запускать все в фоновом режиме: Если ваш вопрос заключался в том, как запустить весь shebang в фоновом режиме, см. Виньетку " Топологии будущего ". Это еще один уровень распараллеливания, но он возможен.

Вы можете использовать furrr пакет, который использует future бежать purrr в многопроцессорном режиме:

library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ──────────────────────────────                                   100%

Вы можете отключить параллельные вычисления с помощью plan(sequential)

Другие вопросы по тегам