Будущее пакета R: инициализация плана () не удалась

Пробую этот код

library(future)
library(foreach)

ncores <- 3
cl <- parallel::makeCluster(3)
avail <- bigstatsr::FBM(ncores, 1, type = "integer", init = 1)
doFuture::registerDoFuture()

res <- vector("list", 5)
for (i in seq_along(res)) {

  while (sum(avail[]) == 0) {
    cat("Waiting..\n")
    Sys.sleep(0.5)
  }
  ind.avail <- which(avail[] == 1)
  cat("Available:", length(ind.avail), "\n")

  plan(cluster, workers = cl[ind.avail])
  foo <- foreach(i = 3:1) %dopar% {
    Sys.sleep(i)
  }

  print(one <- ind.avail[1])
  avail[one] <- 0; print(avail[])
  res[[i]] <- cluster(workers = cl[one], {
    Sys.sleep(5)
    avail[one] <- 1
    i
  })
}

sapply(res, resolved)
parallel::stopCluster(cl)

Ошибка, которую я получаю: Initialization of plan() failed, because the test future used for validation failed. The reason was: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘<none>’, expression = ‘NA’),

Объяснение моего примера, пытающегося воспроизвести мою реальную проблему:

  • Я повторяю много раз (здесь 5) в течение двух шагов
  • первый шаг легко распараллелить с foreach
  • второй шаг нелегко распараллелить и зависит от первого шага

Поэтому моя идея заключалась в том, чтобы распараллелить первый шаг по всем доступным кластерам и выполнить второй шаг асинхронно, используя только один кластер. Этот кластер больше не будет доступен, пока эта асинхронная работа не будет завершена. Тогда на следующем первом шаге будет доступно меньше кластера и так далее. Если для первого шага больше нет доступного кластера, он будет ожидать завершения некоторой асинхронной работы и освобождения некоторого кластера.

0 ответов

Я могу воспроизвести это. Я полагаю, что вам удается испортить связь с основным процессом R и узлом кластера, вызвав plan() с узлом кластера, который содержит результаты будущего, которые еще не были возвращены в основной процесс R. (Я попытался привести более простой пример коррупции такого типа, но это не очевидно, не затрачивая гораздо больше времени.)

Будущая структура уже обнаруживает это (отсюда и ошибка). Я обновил версию для разработчиков в будущем, чтобы дать больше подсказок и свидетельств о том, что происходит:

Error: Initialization of plan() failed, because the test future used for
  validation failed. The reason was: Unexpected result (of class ‘character’
  != ‘FutureResult’) retrieved for ClusterFuture future (label = 
  ‘future-plan-test’, expression = ‘NA’): future-grmall. This suggests that
  the communication with ClusterFuture worker (‘SOCKnode’ #1) is out of sync.

Я думаю, что вы можете обойти это, убедившись, что вы собрали стоимость разрешенных фьючерсов, прежде чем снова использовать их работников. plan(cluster, ...) вызов подтверждает, что по крайней мере одно будущее может быть успешно решено.

Другие вопросы по тегам