Многоядерная обработка Master/ Slave в R

Я хотел бы распараллелить цикл while в следующем коде:

work <- function(n) {
  # Do some intensive work (e.g explore a graph starting at n).
  # After this, we don't need to execute work() on nodes in excluding.
  # (e.g exclude could be the nodes explored/reached from n)
  # n is just an example. exclude can be a potentially large set.
  Sys.sleep(2)
  exclude <- c(n, sample(nodes, rbinom(1, length(nodes), 0.5)))
  return(exclude)
}

nodes <- 1:1e3

#Order of execution doesn't matter
nodes <- sample(nodes)

#parallelize this loop
while(length(nodes) > 0) {
  n <- nodes[1]
  exclude <- work(n)
  nodes <- setdiff(nodes, exclude)
}

Не имеет значения, если work() выполняется на исключенном узле, но мы хотели бы свести к минимуму такие случаи. Целью приведенного выше цикла while является выполнение work() как можно меньше раз

Это не смущающее параллельное вычисление, поэтому я не знаю, как использовать parLapply непосредственно. Можно использовать среду master-slave, но я не знаю ни одного для многоядерного программирования (в Windows).

В качестве конкретного примера, вы можете думать о work(n) как graph_exploration(n) (функция находит все узлы, связанные с n) а также exclude в качестве узлов в связанном компоненте п. Конечная цель - найти один узел из каждого подключенного компонента. Ты хочешь бежать graph_exploration(n) столько раз, сколько необходимо, потому что это дорогая операция.

1 ответ

Miheer,

Вот предлагаемое решение.

Предварительно иноходь:

Основная проблема здесь (насколько я понимаю) состоит в том, чтобы разблокировать цикл while, пока work() хруст номера. По сути, вы хотите, чтобы цикл был неблокированным, пока остаются ресурсы, чтобы инициировать больше work() звонки и обработка. Хорошо, как? Ну, я рекомендую вам использовать будущий пакет.

Приведенный ниже пример по существу создает новый вызов процесса work() за каждый звонок. Однако этот вызов не заблокирует цикл while, если все назначенные рабочие процессы не заняты. Вы можете увидеть это как каждый work() вызов имеет другой идентификатор процесса, как показано в выходных данных времени выполнения.

Таким образом, каждый work() работает независимо, чтобы закончить, мы решаем все фьючерсы и возвращаем результаты финала.

Результаты:

  • Последовательное время выполнения: 20,61 сек.
  • Параллельное время выполнения: 8,22 с

Я надеюсь, что это указывает вам в правильном направлении.

Предостережение: вам нужно пройти через все узлы, но это улучшит время выполнения.

Настройка машины:

R version 3.4.1 (2017-06-30)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows >= 8 x64 (build 9200)
[Windows 10, 8 Core Xeon, 64Gb RAM]

Пример параллельного кода:

# Check for, and install and load required packages.
requiredPackages <-
  c("tictoc", "listenv", "future")


ipak <- function(pkg) {
  new.pkg <- pkg[!(pkg %in% installed.packages()[, "Package"])]
  if (length(new.pkg))
    install.packages(new.pkg, dependencies = TRUE)
  sapply(pkg, require, character.only = TRUE)
}

ipak(requiredPackages)

work <- function(n) {
  # Do some intensive work (e.g explore a graph starting at n).
  # After this, we don't need to execute work() on nodes in exclude.
  # (e.g exclude could be the nodes explored/reached from n)
  # n is just an example. exclude can be a potentially large set.
  Sys.sleep(2) # sample(.5:5))
  exclude <- n
  return(exclude)
}

plan(multiprocess, workers = 4L)
#plan(sequential)

nodesGraph  <- 1:10
nodesGraph  <- sample(nodesGraph)
nodesCount  <- length(nodesGraph)
resultsList <- listenv()

tic()
while ( nodesCount > 0 ) {
  n <- nodesGraph[[nodesCount]]
  ## This is evaluated in parallel and will only block
  ## if all workers are busy.
  resultsList[[nodesCount]] %<-% {
      list( exclude = work(n), 
            iteration = length(nodesGraph), 
            pid = Sys.getpid())
  }

  nodesGraph <- setdiff(nodesGraph, nodesGraph[[nodesCount]] )
  cat("nodesGraph",nodesGraph,"\n")
  cat("nodesCount",nodesCount,"\n")
  nodesCount = nodesCount - 1
}
toc()

## Resolve all futures (blocks if not already finished)
resultsList <- as.list(resultsList)
str(resultsList)

Параллельный вывод времени выполнения:

> source('<hidden>/dev/stackru/47230384/47230384v5.R')
nodesGraph 2 5 8 4 6 10 7 1 9 
nodesCount 10 
nodesGraph 2 5 8 4 6 10 7 1 
nodesCount 9 
nodesGraph 2 5 8 4 6 10 7 
nodesCount 8 
nodesGraph 2 5 8 4 6 10 
nodesCount 7 
nodesGraph 2 5 8 4 6 
nodesCount 6 
nodesGraph 2 5 8 4 
nodesCount 5 
nodesGraph 2 5 8 
nodesCount 4 
nodesGraph 2 5 
nodesCount 3 
nodesGraph 2 
nodesCount 2 
nodesGraph  
nodesCount 1 
8.22 sec elapsed
List of 10
 $ :List of 3
  ..$ exclude  : int 2
  ..$ iteration: int 1
  ..$ pid      : int 10692
 $ :List of 3
  ..$ exclude  : int 5
  ..$ iteration: int 2
  ..$ pid      : int 2032
 $ :List of 3
  ..$ exclude  : int 8
  ..$ iteration: int 3
  ..$ pid      : int 16356
 $ :List of 3
  ..$ exclude  : int 4
  ..$ iteration: int 4
  ..$ pid      : int 7756
 $ :List of 3
  ..$ exclude  : int 6
  ..$ iteration: int 5
  ..$ pid      : int 10692
 $ :List of 3
  ..$ exclude  : int 10
  ..$ iteration: int 6
  ..$ pid      : int 2032
 $ :List of 3
  ..$ exclude  : int 7
  ..$ iteration: int 7
  ..$ pid      : int 16356
 $ :List of 3
  ..$ exclude  : int 1
  ..$ iteration: int 8
  ..$ pid      : int 7756
 $ :List of 3
  ..$ exclude  : int 9
  ..$ iteration: int 9
  ..$ pid      : int 10692
 $ :List of 3
  ..$ exclude  : int 3
  ..$ iteration: int 10
  ..$ pid      : int 2032

Последовательный вывод времени выполнения

> source('<hidden>/dev/stackru/47230384/47230384v5.R')
nodesGraph 6 2 1 9 4 8 10 7 3 
nodesCount 10 
nodesGraph 6 2 1 9 4 8 10 7 
nodesCount 9 
nodesGraph 6 2 1 9 4 8 10 
nodesCount 8 
nodesGraph 6 2 1 9 4 8 
nodesCount 7 
nodesGraph 6 2 1 9 4 
nodesCount 6 
nodesGraph 6 2 1 9 
nodesCount 5 
nodesGraph 6 2 1 
nodesCount 4 
nodesGraph 6 2 
nodesCount 3 
nodesGraph 6 
nodesCount 2 
nodesGraph  
nodesCount 1 
20.61 sec elapsed
List of 10
 $ :List of 3
  ..$ exclude  : int 6
  ..$ iteration: int 1
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 2
  ..$ iteration: int 2
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 1
  ..$ iteration: int 3
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 9
  ..$ iteration: int 4
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 4
  ..$ iteration: int 5
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 8
  ..$ iteration: int 6
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 10
  ..$ iteration: int 7
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 7
  ..$ iteration: int 8
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 3
  ..$ iteration: int 9
  ..$ pid      : int 12484
 $ :List of 3
  ..$ exclude  : int 5
  ..$ iteration: int 10
  ..$ pid      : int 12484
Другие вопросы по тегам